This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new d2e2cdafed Fix skip_records over-counting when partial record precedes 
num_rows page skip (#9374)
d2e2cdafed is described below

commit d2e2cdafed93a8e0152fe1d018ec2cef154ccb20
Author: Jonas Dedden <[email protected]>
AuthorDate: Mon Mar 9 21:32:53 2026 +0100

    Fix skip_records over-counting when partial record precedes num_rows page 
skip (#9374)
    
    # Which issue does this PR close?
    
    - Closes #9370 .
    
    # Rationale for this change
    
    The bug occurs when using RowSelection with nested types (like
    List<String>) when:
    1. A column has multiple pages in a row group
    2. The selected rows span across page boundaries
      3. The first page is entirely consumed during skip operations
    
    The issue was in `arrow-rs/parquet/src/column/reader.rs:287-382`
    (`skip_records` function).
    
    **Root cause:** When `skip_records` completed successfully after
    crossing page boundaries, the `has_partial` state in the
    `RepetitionLevelDecoder` could incorrectly remain true.
    
    This happened when:
    
    - The skip operation exhausted a page where has_record_delimiter was
    false
    - The skip found the remaining records on the next page by counting a
    delimiter at index 0
    - When a subsequent read_records(1) was called, the stale
    has_partial=true state caused count_records to incorrectly interpret the
    first repetition level (0) at index 0 as ending a "phantom" partial
    record, returning (1 record, 0 levels, 0 values) instead of properly
    reading the actual record data.
    
    For a more descriptive explanation, look here:
    https://github.com/apache/arrow-rs/issues/9370#issuecomment-3861143928
    
    # What changes are included in this PR?
    
    Added code at the end of skip_records to reset the partial record state
    when all requested records have been successfully skipped.
    
    This ensures that after skip_records completes, we're at a clean record
    boundary with no lingering partial record state, fixing the array length
    mismatch in StructArrayReader.
    
    # Are these changes tested?
    
    Commit
    
https://github.com/apache/arrow-rs/commit/365bd9a4ced7897f391e4533930a0c9683952723
    introduces a test showcasing this issue with v2 data pages only on a
    unit-test level. PR https://github.com/apache/arrow-rs/pull/9399 could
    be used to showcase the issue in an end-to-end way.
    
    Previously wrong assumption that thought it had to do with mixing v1 and
    v2 data pages:
    
    ```
    In b52e043 I added a test that I validated to fail whenever I remove my fix.
    
      Bug Mechanism
    
      The bug requires three ingredients:
    
      1. Page 1 (DataPage v1): Contains a nested column (with rep levels). 
During skip_records, all levels on this page are consumed. count_records sees 
no following rep=0 delimiter, so it sets
      has_partial=true. Since has_record_delimiter is false (the default 
InMemoryPageReader returns false when more pages exist), flush_partial is not 
called.
      2. Page 2 (DataPage v2): Has num_rows available in its metadata. When 
num_rows <= remaining_records, the entire page is skipped via skip_next_page() 
— this does not touch the rep level decoder at all,
      so has_partial remains stale true from page 1.
      3. Page 3 (DataPage v1): When read_records loads this page, the stale 
has_partial=true causes the rep=0 at position 0 to be misinterpreted as 
completing a "phantom" partial record. This produces (1
      record, 0 levels, 0 values) instead of reading the actual record data.
    
      Test Verification
    
      - With fix (flush_partial at end of skip_records): read_records(1) 
correctly returns (1, 2, 2) with values [70, 80]
      - Without fix: read_records(1) returns (1, 0, 0) — a phantom record with 
no data, which is what causes the "Not all children array length are the same!" 
error when different sibling columns in a struct
      produce different record counts
    ```
    
    ---------
    
    Co-authored-by: Ed Seidl <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 parquet/src/column/page.rs                    |   9 +-
 parquet/src/column/reader.rs                  | 131 ++++++++++++++++++++++++++
 parquet/src/file/serialized_reader.rs         |   7 +-
 parquet/tests/arrow_reader/row_filter/sync.rs |   2 -
 4 files changed, 145 insertions(+), 4 deletions(-)

diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index f18b296c1c..4cfc07a028 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -406,7 +406,14 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send 
{
     /// [(#4327)]: https://github.com/apache/arrow-rs/pull/4327
     /// [(#4943)]: https://github.com/apache/arrow-rs/pull/4943
     fn at_record_boundary(&mut self) -> Result<bool> {
-        Ok(self.peek_next_page()?.is_none())
+        match self.peek_next_page()? {
+            // Last page in the column chunk - always a record boundary
+            None => Ok(true),
+            // A V2 data page is required by the parquet spec to start at a
+            // record boundary, so the current page ends at one.  V2 pages
+            // are identified by having `num_rows` set in their header.
+            Some(metadata) => Ok(metadata.num_rows.is_some()),
+        }
     }
 }
 
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 387a0602a6..29cb50185a 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -1361,4 +1361,135 @@ mod tests {
             );
         }
     }
+
+    /// Regression test for <https://github.com/apache/arrow-rs/issues/9370>
+    ///
+    /// Reproduces the production scenario: all DataPage v2 pages for a
+    /// list column (rep_level=1) read without an offset index (i.e.
+    /// `at_record_boundary` returns false for non-last pages).
+    ///
+    /// When a prior operation (here `skip_records(1)`) loads a v2 page,
+    /// and a subsequent `skip_records` exhausts the remaining levels on
+    /// that page, the rep level decoder is left with `has_partial=true`.
+    /// Because `has_record_delimiter` is false, the partial is not
+    /// flushed during level-based processing. When the next v2 page is
+    /// then peeked with `num_rows` available, the whole-page-skip
+    /// shortcut must flush the pending partial first. Otherwise:
+    ///
+    /// 1. The skip over-counts (skips N+1 records instead of N), and
+    /// 2. The stale `has_partial` causes a subsequent `read_records` to
+    ///    produce a "phantom" record with 0 values.
+    #[test]
+    fn test_skip_records_v2_page_skip_accounts_for_partial() {
+        use crate::encodings::levels::LevelEncoder;
+
+        let max_rep_level: i16 = 1;
+        let max_def_level: i16 = 1;
+
+        // Column descriptor for a list element column (rep=1, def=1)
+        let primitive_type = SchemaType::primitive_type_builder("element", 
PhysicalType::INT32)
+            .with_repetition(Repetition::REQUIRED)
+            .build()
+            .unwrap();
+        let desc = Arc::new(ColumnDescriptor::new(
+            Arc::new(primitive_type),
+            max_def_level,
+            max_rep_level,
+            ColumnPath::new(vec!["list".to_string(), "element".to_string()]),
+        ));
+
+        // Helper: build a DataPage v2 for this list column.
+        let make_v2_page =
+            |rep_levels: &[i16], def_levels: &[i16], values: &[i32], num_rows: 
u32| -> Page {
+                let mut rep_enc = LevelEncoder::v2(max_rep_level, 
rep_levels.len());
+                rep_enc.put(rep_levels);
+                let rep_bytes = rep_enc.consume();
+
+                let mut def_enc = LevelEncoder::v2(max_def_level, 
def_levels.len());
+                def_enc.put(def_levels);
+                let def_bytes = def_enc.consume();
+
+                let val_bytes: Vec<u8> = values.iter().flat_map(|v| 
v.to_le_bytes()).collect();
+
+                let mut buf = Vec::new();
+                buf.extend_from_slice(&rep_bytes);
+                buf.extend_from_slice(&def_bytes);
+                buf.extend_from_slice(&val_bytes);
+
+                Page::DataPageV2 {
+                    buf: Bytes::from(buf),
+                    num_values: rep_levels.len() as u32,
+                    encoding: Encoding::PLAIN,
+                    num_nulls: 0,
+                    num_rows,
+                    def_levels_byte_len: def_bytes.len() as u32,
+                    rep_levels_byte_len: rep_bytes.len() as u32,
+                    is_compressed: false,
+                    statistics: None,
+                }
+            };
+
+        // All pages are DataPage v2 (matching the production scenario where
+        // parquet-rs writes only v2 data pages and no offset index is loaded,
+        // so at_record_boundary() returns false for non-last pages).
+
+        // Page 1 (v2): 2 records × 2 elements = [10,20], [30,40]
+        let page1 = make_v2_page(&[0, 1, 0, 1], &[1, 1, 1, 1], &[10, 20, 30, 
40], 2);
+
+        // Page 2 (v2): 2 records × 2 elements = [50,60], [70,80]
+        let page2 = make_v2_page(&[0, 1, 0, 1], &[1, 1, 1, 1], &[50, 60, 70, 
80], 2);
+
+        // Page 3 (v2): 1 record × 2 elements = [90,100]
+        let page3 = make_v2_page(&[0, 1], &[1, 1], &[90, 100], 1);
+
+        // 5 records total: [10,20], [30,40], [50,60], [70,80], [90,100]
+        let pages = VecDeque::from(vec![page1, page2, page3]);
+        let page_reader = InMemoryPageReader::new(pages);
+        let column_reader: ColumnReader = get_column_reader(desc, 
Box::new(page_reader));
+        let mut typed_reader = 
get_typed_column_reader::<Int32Type>(column_reader);
+
+        // Step 1 — skip 1 record:
+        //   Peek page 1: num_rows=2, remaining=1 → rows(2) > remaining(1),
+        //   so the page is LOADED (not whole-page-skipped).
+        //   Level-based skip consumes rep levels [0,1] for record [10,20],
+        //   stopping at the 0 that starts record [30,40].
+        let skipped = typed_reader.skip_records(1).unwrap();
+        assert_eq!(skipped, 1);
+
+        // Step 2 — skip 2 more records ([30,40] and [50,60]):
+        //   Mid-page in page 1 with 2 remaining levels [0,1] for [30,40].
+        //   skip_rep_levels(2, 2): the leading 0 does NOT act as a record
+        //   delimiter (has_partial=false, idx==0), so count_records returns
+        //   (true, 0, 2) — all levels consumed, has_partial=true, 0 records.
+        //
+        //   has_record_delimiter is false → no flush at page boundary.
+        //   Page 1 exhausted → peek page 2 (v2, num_rows=2).
+        //
+        //   With fix: flush_partial → remaining 2→1, page 2 NOT skipped
+        //   (rows=2 > remaining=1). Load page 2, skip 1 record [50,60].
+        //
+        //   Without fix: rows(2) <= remaining(2) → page 2 whole-page-skipped,
+        //   over-counting by 1. has_partial stays true (stale from page 1).
+        let skipped = typed_reader.skip_records(2).unwrap();
+        assert_eq!(skipped, 2);
+
+        // Step 3 — read 1 record:
+        let mut values = Vec::new();
+        let mut def_levels = Vec::new();
+        let mut rep_levels = Vec::new();
+
+        let (records, values_read, levels_read) = typed_reader
+            .read_records(1, Some(&mut def_levels), Some(&mut rep_levels), 
&mut values)
+            .unwrap();
+
+        // Without the fix: (1, 0, 0) — phantom record from stale has_partial;
+        //   the rep=0 on page 3 "completes" the phantom, yielding 0 values.
+        // With the fix:    (1, 2, 2) — correctly reads record [70, 80].
+        assert_eq!(records, 1, "should read exactly 1 record");
+        assert_eq!(levels_read, 2, "should read 2 levels for the record");
+        assert_eq!(values_read, 2, "should read 2 non-null values");
+        assert_eq!(values, vec![70, 80], "should contain 4th record's values");
+        assert_eq!(rep_levels, vec![0, 1], "rep levels for a 2-element list");
+        assert_eq!(def_levels, vec![1, 1], "def levels (all non-null)");
+    }
 }
diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index b3b6383f78..254ccb779a 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -1158,7 +1158,12 @@ impl<R: ChunkReader> PageReader for 
SerializedPageReader<R> {
 
     fn at_record_boundary(&mut self) -> Result<bool> {
         match &mut self.state {
-            SerializedPageReaderState::Values { .. } => 
Ok(self.peek_next_page()?.is_none()),
+            SerializedPageReaderState::Values { .. } => match 
self.peek_next_page()? {
+                None => Ok(true),
+                // V2 data pages must start at record boundaries per the 
parquet
+                // spec, so the current page ends at one.
+                Some(metadata) => Ok(metadata.num_rows.is_some()),
+            },
             SerializedPageReaderState::Pages { .. } => Ok(true),
         }
     }
diff --git a/parquet/tests/arrow_reader/row_filter/sync.rs 
b/parquet/tests/arrow_reader/row_filter/sync.rs
index e59fa392cf..77a75220dc 100644
--- a/parquet/tests/arrow_reader/row_filter/sync.rs
+++ b/parquet/tests/arrow_reader/row_filter/sync.rs
@@ -206,7 +206,6 @@ fn test_row_filter_full_page_skip_is_handled() {
 /// Without the fix, the list column over-skips by one record, causing
 /// struct children to disagree on record counts.
 #[test]
-#[should_panic(expected = "StructArrayReader out of sync in read_records, 
expected 1 read, got 0")]
 fn test_row_selection_list_column_v2_page_boundary_skip() {
     use arrow_array::builder::{Int32Builder, ListBuilder};
 
@@ -327,7 +326,6 @@ fn test_row_selection_list_column_v2_page_boundary_skip() {
 /// bug causes one leaf to over-skip by one record while the other stays
 /// correct.
 #[test]
-#[should_panic(expected = "Not all children array length are the same!")]
 fn test_list_struct_page_boundary_desync_produces_length_mismatch() {
     use arrow_array::Array;
     use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder, 
StructBuilder};

Reply via email to