This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a089ad6e4 Optimize RowNumberReader to be 8x faster (#9680)
7a089ad6e4 is described below

commit 7a089ad6e4767e181c00fa5d1d28cfd9ef96b16d
Author: Samyak Sarnayak <[email protected]>
AuthorDate: Thu Apr 16 02:26:23 2026 +0530

    Optimize RowNumberReader to be 8x faster (#9680)
    
    # Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax.
    -->
    
    - Closes None
    
    # Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    We internally found `RowNumberReader` to be a hot path in some of our
    queries. Flamegraphs showed ~70% of the cpu time taken by methods in
    `RowNumberReader`.
    
    These can be made an order of magnitude faster (benchmarks below).
    
    # What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    - Instead of storing an iterator over individual row numbers, we now
    store a vec of ranges.
        - These ranges are not materialized into a fully array until needed.
    - `read_records` was previously linear in terms of number of rows read.
    - Now it's close to constant since one batch (8192 rows) usually is
    satisfied by one row range (which comes from a row group).
      - Same for `skip_records`
    - `consume_batch` is still linear in terms of rows, but it is faster
    since it can pre-allocate the output vec.
    - Previously, the `Flatten` iter would have prevented it pre-allocating
    (it's not an `ExactSizeIterator`).
    
    # Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    - Yes, added more unit tests
    - I have some benchmarks at https://github.com/Samyak2/arrow-rs/pull/1,
    but they need `RowNumberReader` to be pub, so I've not included them
    here
    
    Before:
    ```
    Benchmarking row_number_read_consume: Warming up for 3.0000 s
    Warning: Unable to complete 100 samples in 5.0s. You may wish to increase 
target time to 7.0s, enable flat sampling, or reduce sample count to 50.
    row_number_read_consume time:   [1.3915 ms 1.3967 ms 1.4035 ms]
    Found 11 outliers among 100 measurements (11.00%)
      1 (1.00%) low severe
      1 (1.00%) low mild
      5 (5.00%) high mild
      4 (4.00%) high severe
    
    row_number_skip_and_read
                            time:   [716.61 µs 718.14 µs 719.91 µs]
    Found 6 outliers among 100 measurements (6.00%)
      1 (1.00%) low severe
      1 (1.00%) low mild
      3 (3.00%) high mild
      1 (1.00%) high severe
    ```
    
    After:
    ```
    row_number_read_consume time:   [159.00 µs 160.81 µs 162.68 µs]
                            change: [−88.900% −88.721% −88.505%] (p = 0.00 < 
0.05)
                            Performance has improved.
    Found 3 outliers among 100 measurements (3.00%)
      1 (1.00%) low mild
      2 (2.00%) high mild
    
    row_number_skip_and_read
                            time:   [79.057 µs 79.924 µs 80.846 µs]
                            change: [−89.025% −88.865% −88.712%] (p = 0.00 < 
0.05)
                            Performance has improved.
    Found 2 outliers among 100 measurements (2.00%)
      2 (2.00%) high mild
    ```
    
    Ranging from **8.6x to 8.9x faster**!
    
    # Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    
    If there are any breaking changes to public APIs, please call them out.
    -->
    
    No
---
 parquet/src/arrow/array_reader/row_number.rs | 147 ++++++++++++++++++++++-----
 1 file changed, 123 insertions(+), 24 deletions(-)

diff --git a/parquet/src/arrow/array_reader/row_number.rs 
b/parquet/src/arrow/array_reader/row_number.rs
index f9e60a2c0d..def88641a1 100644
--- a/parquet/src/arrow/array_reader/row_number.rs
+++ b/parquet/src/arrow/array_reader/row_number.rs
@@ -21,12 +21,21 @@ use crate::file::metadata::{ParquetMetaData, 
RowGroupMetaData};
 use arrow_array::{ArrayRef, Int64Array};
 use arrow_schema::DataType;
 use std::any::Any;
-use std::collections::HashMap;
+use std::collections::{HashMap, VecDeque};
+use std::ops::Range;
 use std::sync::Arc;
 
+/// Tracks row numbers within a Parquet file and emits them as an `Int64Array`.
 pub(crate) struct RowNumberReader {
-    buffered_row_numbers: Vec<i64>,
-    remaining_row_numbers: 
std::iter::Flatten<std::vec::IntoIter<std::ops::Range<i64>>>,
+    /// Pre-computed row ranges that are not read yet.
+    ///
+    /// This reader only keeps track of the ranges of row numbers for each row 
group. The range is
+    /// not materialized into a full array until it's needed.
+    remaining_row_ranges: VecDeque<Range<i64>>,
+    /// Row ranges read but not emitted.
+    ///
+    /// These are either full or partial (split) row ranges taken from 
`remaining_row_ranges`.
+    buffered_row_ranges: Vec<Range<i64>>,
 }
 
 impl RowNumberReader {
@@ -49,7 +58,7 @@ impl RowNumberReader {
         // Pass 2: Build ranges in the order specified by the row_groups 
iterator
         // This is O(N) where N is the number of selected row groups
         // This preserves the user's requested order instead of sorting by 
ordinal
-        let ranges: Vec<_> = row_groups
+        let ranges: VecDeque<_> = row_groups
             .map(|rg| {
                 let ordinal = rg.ordinal().ok_or_else(|| {
                     ParquetError::General(
@@ -70,23 +79,52 @@ impl RowNumberReader {
             .collect::<Result<_>>()?;
 
         Ok(Self {
-            buffered_row_numbers: Vec::new(),
-            remaining_row_numbers: ranges.into_iter().flatten(),
+            buffered_row_ranges: Vec::new(),
+            remaining_row_ranges: ranges,
         })
     }
+
+    /// Take up to `count` rows from the first range, splitting it if needed.
+    ///
+    /// Returns `None` if no ranges remain.
+    fn take_range(&mut self, count: usize) -> Option<Range<i64>> {
+        let first = self.remaining_row_ranges.front_mut()?;
+        if (first.end - first.start) <= count as i64 {
+            // take out the full range
+            self.remaining_row_ranges.pop_front()
+        } else {
+            // first range has more rows than we need.
+            // so we split the range and put the remaining back.
+            let split = first.start + count as i64;
+            let taken = first.start..split;
+            first.start = split;
+            Some(taken)
+        }
+    }
 }
 
 impl ArrayReader for RowNumberReader {
     fn read_records(&mut self, batch_size: usize) -> Result<usize> {
-        let starting_len = self.buffered_row_numbers.len();
-        self.buffered_row_numbers
-            .extend((&mut self.remaining_row_numbers).take(batch_size));
-        Ok(self.buffered_row_numbers.len() - starting_len)
+        let mut remaining = batch_size;
+        while remaining > 0 {
+            let Some(range) = self.take_range(remaining) else {
+                break;
+            };
+            remaining -= (range.end - range.start) as usize;
+            self.buffered_row_ranges.push(range);
+        }
+        Ok(batch_size - remaining)
     }
 
     fn skip_records(&mut self, num_records: usize) -> Result<usize> {
-        // TODO: Use advance_by when it stabilizes to improve performance
-        Ok((&mut self.remaining_row_numbers).take(num_records).count())
+        let mut remaining = num_records;
+        while remaining > 0 {
+            let Some(range) = self.take_range(remaining) else {
+                break;
+            };
+            remaining -= (range.end - range.start) as usize;
+        }
+        Ok(num_records - remaining)
     }
 
     fn as_any(&self) -> &dyn Any {
@@ -98,9 +136,18 @@ impl ArrayReader for RowNumberReader {
     }
 
     fn consume_batch(&mut self) -> Result<ArrayRef> {
-        Ok(Arc::new(Int64Array::from_iter(
-            self.buffered_row_numbers.drain(..),
-        )))
+        let total_rows: i64 = self
+            .buffered_row_ranges
+            .iter()
+            .map(|range| range.end - range.start)
+            .sum();
+        let mut result = Vec::with_capacity(total_rows as usize);
+
+        for range in self.buffered_row_ranges.drain(..) {
+            result.extend(range);
+        }
+
+        Ok(Arc::new(Int64Array::from(result)))
     }
 
     fn get_def_levels(&self) -> Option<&[i16]> {
@@ -168,6 +215,21 @@ mod tests {
         ParquetMetaData::new(file_metadata, row_group_metas)
     }
 
+    fn consume_row_numbers(reader: &mut RowNumberReader) -> Vec<i64> {
+        let array = reader.consume_batch().unwrap();
+        array
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .unwrap()
+            .values()
+            .to_vec()
+    }
+
+    fn reader_for_all(metadata: &ParquetMetaData) -> RowNumberReader {
+        let all_rgs: Vec<_> = metadata.row_groups().iter().collect();
+        RowNumberReader::try_new(metadata, all_rgs.into_iter()).unwrap()
+    }
+
     #[test]
     fn test_row_number_reader_reverse_order() {
         // Create metadata with 3 row groups, each with 2 rows
@@ -191,16 +253,53 @@ mod tests {
         let num_read = reader.read_records(6).unwrap();
         assert_eq!(num_read, 4); // Should read 4 rows total (2 from each 
selected group)
 
-        let array = reader.consume_batch().unwrap();
-        let row_numbers = array.as_any().downcast_ref::<Int64Array>().unwrap();
-
         // Expected: row group 2 first (rows 4-5), then row group 0 (rows 0-1)
-        let expected = vec![4, 5, 0, 1];
-        let actual: Vec<i64> = row_numbers.iter().map(|v| 
v.unwrap()).collect();
+        assert_eq!(consume_row_numbers(&mut reader), vec![4, 5, 0, 1]);
+    }
 
-        assert_eq!(
-            actual, expected,
-            "Row numbers should match the order of selected row groups, not 
file order"
-        );
+    #[test]
+    fn test_range_splitting_across_batches() {
+        // One row group with 10 rows
+        let metadata = create_test_parquet_metadata(vec![(0, 10)]);
+        let mut reader = reader_for_all(&metadata);
+
+        assert_eq!(reader.read_records(3).unwrap(), 3);
+        assert_eq!(consume_row_numbers(&mut reader), vec![0, 1, 2]);
+
+        assert_eq!(reader.read_records(3).unwrap(), 3);
+        assert_eq!(consume_row_numbers(&mut reader), vec![3, 4, 5]);
+
+        assert_eq!(reader.read_records(3).unwrap(), 3);
+        assert_eq!(consume_row_numbers(&mut reader), vec![6, 7, 8]);
+
+        // Only 1 row left, requesting 3
+        assert_eq!(reader.read_records(3).unwrap(), 1);
+        assert_eq!(consume_row_numbers(&mut reader), vec![9]);
+    }
+
+    #[test]
+    fn test_interleaved_skip_and_read() {
+        // Row group 0: rows 0..5
+        // Row group 1: rows 5..10
+        let metadata = create_test_parquet_metadata(vec![(0, 5), (1, 5)]);
+        let mut reader = reader_for_all(&metadata);
+
+        assert_eq!(reader.skip_records(2).unwrap(), 2); // skip [0,1]
+        assert_eq!(reader.read_records(2).unwrap(), 2); // read [2,3]
+        assert_eq!(reader.skip_records(3).unwrap(), 3); // skip [4] then [5,6]
+        assert_eq!(reader.read_records(3).unwrap(), 3); // read [7,8,9]
+
+        assert_eq!(consume_row_numbers(&mut reader), vec![2, 3, 7, 8, 9]);
+    }
+
+    #[test]
+    fn test_skip_then_read() {
+        // One row group with 10 rows
+        let metadata = create_test_parquet_metadata(vec![(0, 10)]);
+        let mut reader = reader_for_all(&metadata);
+
+        assert_eq!(reader.skip_records(3).unwrap(), 3);
+        assert_eq!(reader.read_records(4).unwrap(), 4);
+        assert_eq!(consume_row_numbers(&mut reader), vec![3, 4, 5, 6]);
     }
 }

Reply via email to