This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 7b335b7712 fix: prevent panic in record reader when row group metadata
overcounts num_rows (#9993)
7b335b7712 is described below
commit 7b335b7712089c1270ab1a989336908a240cc812
Author: BoazC-MSFT <[email protected]>
AuthorDate: Fri May 22 21:26:57 2026 +0300
fix: prevent panic in record reader when row group metadata overcounts
num_rows (#9993)
# Which issue does this PR close?
- Closes #9992.
# Rationale for this change
The record reader (`RowIter` / `get_row_iter`) panics with `index out of
bounds` when a Parquet file's row group metadata declares more rows than
a column chunk actually contains. This happens in production when
reading third-party Parquet files with mismatched metadata. Instead of
panicking, the reader should return an error.
# What changes are included in this PR?
Three layers of fix in `parquet/src/record/`:
**triplet.rs - fix the inconsistent internal state:**
- Reset `curr_triplet_index` to 0 in the exhaustion path of `read_next`,
so the stale index from the previous batch never persists alongside
empty buffers.
- Return 0 from `current_def_level` and `current_rep_level` when
`has_next` is false, as defense-in-depth against any caller that skips
the `has_next` check.
**reader.rs - return errors instead of panicking:**
- Add `has_next()` guards before consuming column data in all
`read_field` variants: `PrimitiveReader`, `OptionReader`,
`RepeatedReader`, and `KeyValueReader`. When a column is exhausted
mid-iteration, `read_field` now returns `Err("Unexpected end of column
data")` which propagates through `ReaderIter::next` as `Some(Err(...))`.
# Are these changes tested?
Yes. Five new tests:
- `test_current_def_level_safe_after_exhaustion` - drives a
`TripletIter` to exhaustion on an optional column and asserts
`current_def_level()` returns 0 instead of panicking.
- `test_current_rep_level_safe_after_exhaustion` - same for
`current_rep_level()` on a repeated column.
- `test_reader_iter_returns_error_when_num_records_exceeds_data` -
exercises the full `ReaderIter` stack with an optional field (via
`nulls.snappy.parquet`).
-
`test_reader_iter_returns_error_for_repeated_field_when_num_records_exceeds_data`
- same for a repeated primitive field (via
`repeated_primitive_no_list.parquet`).
-
`test_reader_iter_returns_error_for_map_field_when_num_records_exceeds_data`
- same for a map field projected alone (via `map_no_value.parquet`).
Each integration test inflates `num_records` by 1 beyond actual data,
asserts all real rows return `Ok`, and asserts the extra iteration
returns `Err` containing "Unexpected end of column data".
# Are there any user-facing changes?
Callers of `get_row_iter` or `RowIter` that previously hit a panic on
corrupt/truncated files will now receive an `Err` from the iterator
instead. No API signature changes.
---
parquet/src/record/reader.rs | 65 +++++++++++++++++++++++++++++++++++++++++++
parquet/src/record/triplet.rs | 44 +++++++++++++++++++++++++++++
2 files changed, 109 insertions(+)
diff --git a/parquet/src/record/reader.rs b/parquet/src/record/reader.rs
index a6b8d2d54c..bee49c70d8 100644
--- a/parquet/src/record/reader.rs
+++ b/parquet/src/record/reader.rs
@@ -437,11 +437,17 @@ impl Reader {
fn read_field(&mut self) -> Result<Field> {
let field = match *self {
Reader::PrimitiveReader(_, ref mut column) => {
+ if !column.has_next() {
+ return Err(general_err!("Unexpected end of column data"));
+ }
let value = column.current_value()?;
column.read_next()?;
value
}
Reader::OptionReader(def_level, ref mut reader) => {
+ if !reader.has_next() {
+ return Err(general_err!("Unexpected end of column data"));
+ }
if reader.current_def_level() > def_level {
reader.read_field()?
} else {
@@ -465,6 +471,9 @@ impl Reader {
Field::Group(row)
}
Reader::RepeatedReader(_, def_level, rep_level, ref mut reader) =>
{
+ if !reader.has_next() {
+ return Err(general_err!("Unexpected end of column data"));
+ }
let mut elements = Vec::new();
loop {
if reader.current_def_level() > def_level {
@@ -488,6 +497,9 @@ impl Reader {
Field::ListInternal(make_list(elements))
}
Reader::KeyValueReader(_, def_level, rep_level, ref mut keys, ref
mut values) => {
+ if !keys.has_next() {
+ return Err(general_err!("Unexpected end of column data"));
+ }
let mut pairs = Vec::new();
loop {
if keys.current_def_level() > def_level {
@@ -1912,4 +1924,57 @@ mod tests {
),]];
assert_eq!(rows, expected_rows);
}
+
+ fn assert_err_on_overcount(file_name: &str, proj_schema: Option<Type>) {
+ let file = get_test_file(file_name);
+ let file_reader = SerializedFileReader::new(file).unwrap();
+ let metadata = file_reader.metadata();
+ let row_group_reader = file_reader.get_row_group(0).unwrap();
+ let actual_rows = row_group_reader.metadata().num_rows() as usize;
+
+ let descr = match proj_schema {
+ Some(schema) => Arc::new(SchemaDescriptor::new(Arc::new(schema))),
+ None => metadata.file_metadata().schema_descr_ptr(),
+ };
+ let reader = TreeBuilder::new().build(descr,
&*row_group_reader).unwrap();
+ let iter = ReaderIter::new(reader, actual_rows + 1).unwrap();
+
+ let rows: Vec<Result<Row>> = iter.collect();
+ assert_eq!(rows.len(), actual_rows + 1);
+ for row in &rows[..actual_rows] {
+ assert!(row.is_ok(), "Expected Ok row, got: {:?}", row);
+ }
+ let err = rows[actual_rows].as_ref().unwrap_err();
+ assert!(
+ err.to_string().contains("Unexpected end of column data"),
+ "Unexpected error message: {}",
+ err
+ );
+ }
+
+ #[test]
+ fn test_reader_iter_returns_error_when_num_records_exceeds_data() {
+ assert_err_on_overcount("nulls.snappy.parquet", None);
+ }
+
+ #[test]
+ fn
test_reader_iter_returns_error_for_repeated_field_when_num_records_exceeds_data()
{
+ assert_err_on_overcount("repeated_primitive_no_list.parquet", None);
+ }
+
+ #[test]
+ fn
test_reader_iter_returns_error_for_map_field_when_num_records_exceeds_data() {
+ let schema = parse_message_type(
+ "message schema {
+ REQUIRED group my_map (MAP) {
+ REPEATED group key_value {
+ REQUIRED INT32 key;
+ OPTIONAL INT32 value;
+ }
+ }
+ }",
+ )
+ .unwrap();
+ assert_err_on_overcount("map_no_value.parquet", Some(schema));
+ }
}
diff --git a/parquet/src/record/triplet.rs b/parquet/src/record/triplet.rs
index 8244dfb128..b4d39bbbd9 100644
--- a/parquet/src/record/triplet.rs
+++ b/parquet/src/record/triplet.rs
@@ -263,6 +263,9 @@ impl<T: DataType> TypedTripletIter<T> {
/// If field is required, then maximum definition level is returned.
#[inline]
fn current_def_level(&self) -> i16 {
+ if !self.has_next {
+ return 0;
+ }
match self.def_levels {
Some(ref vec) => vec[self.curr_triplet_index],
None => self.max_def_level,
@@ -273,6 +276,9 @@ impl<T: DataType> TypedTripletIter<T> {
/// If field is required, then maximum repetition level is returned.
#[inline]
fn current_rep_level(&self) -> i16 {
+ if !self.has_next {
+ return 0;
+ }
match self.rep_levels {
Some(ref vec) => vec[self.curr_triplet_index],
None => self.max_rep_level,
@@ -315,6 +321,7 @@ impl<T: DataType> TypedTripletIter<T> {
// No more values or levels to read
if records_read == 0 && values_read == 0 && levels_read == 0 {
+ self.curr_triplet_index = 0;
self.has_next = false;
return Ok(false);
}
@@ -561,4 +568,41 @@ mod tests {
assert_eq!(def_levels, expected_def_levels);
assert_eq!(rep_levels, expected_rep_levels);
}
+
+ fn open_triplet_iter(file_name: &str, path: &[&str], batch_size: usize) ->
TripletIter {
+ let column_path = ColumnPath::from(path.iter().map(|x|
x.to_string()).collect::<Vec<_>>());
+ let file = get_test_file(file_name);
+ let file_reader = SerializedFileReader::new(file).unwrap();
+ let metadata = file_reader.metadata();
+ let schema = metadata.file_metadata().schema_descr();
+ let row_group_reader = file_reader.get_row_group(0).unwrap();
+ for i in 0..schema.num_columns() {
+ let descr = schema.column(i);
+ if descr.path() == &column_path {
+ let reader = row_group_reader.get_column_reader(i).unwrap();
+ return TripletIter::new(descr.clone(), reader, batch_size);
+ }
+ }
+ panic!("Column {column_path:?} not found in {file_name}");
+ }
+
+ #[test]
+ fn test_current_def_level_safe_after_exhaustion() {
+ let mut iter = open_triplet_iter("nulls.snappy.parquet", &["b_struct",
"b_c_int"], 256);
+ while let Ok(true) = iter.read_next() {}
+ assert!(!iter.has_next());
+ assert_eq!(iter.current_def_level(), 0);
+ }
+
+ #[test]
+ fn test_current_rep_level_safe_after_exhaustion() {
+ let mut iter = open_triplet_iter(
+ "nested_lists.snappy.parquet",
+ &["a", "list", "element", "list", "element", "list", "element"],
+ 256,
+ );
+ while let Ok(true) = iter.read_next() {}
+ assert!(!iter.has_next());
+ assert_eq!(iter.current_rep_level(), 0);
+ }
}