This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 29cd685061 Change some panics to errors in parquet decoder (#8602)
29cd685061 is described below
commit 29cd6850619216f862e517ad383cf5258133ba41
Author: Alex Stephen <[email protected]>
AuthorDate: Mon Oct 27 11:23:05 2025 -0700
Change some panics to errors in parquet decoder (#8602)
# Rationale for this change
We've caused some unexpected panics from our internal testing. We've put
in error checks for all of these so that they don't affect other users.
# What changes are included in this PR?
Various error checks to ensure panics don't occur.
# Are these changes tested?
Tests should continue to pass.
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
Existing tests should cover these changes.
# Are there any user-facing changes?
None.
---------
Co-authored-by: Ed Seidl <[email protected]>
Co-authored-by: Ryan Johnson <[email protected]>
---
parquet/src/column/page.rs | 2 +-
parquet/src/column/reader.rs | 34 ++++++++++++---
parquet/src/encodings/decoding.rs | 17 ++++++++
parquet/src/encodings/rle.rs | 5 ++-
parquet/src/file/reader.rs | 45 ++++++++++++++++++++
parquet/src/file/serialized_reader.rs | 75 +++++++++++++++++++++++++++++++++-
parquet/src/schema/types.rs | 28 +++++++------
parquet/tests/arrow_reader/bad_data.rs | 6 ++-
8 files changed, 190 insertions(+), 22 deletions(-)
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index 23517f05df..f18b296c1c 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -31,7 +31,7 @@ use crate::file::statistics::{Statistics,
page_stats_to_thrift};
/// List of supported pages.
/// These are 1-to-1 mapped from the equivalent Thrift definitions, except
`buf` which
/// used to store uncompressed bytes of the page.
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub enum Page {
/// Data page Parquet format v1.
DataPage {
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index b8ff38efa3..ebde79e6a7 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -569,11 +569,16 @@ fn parse_v1_level(
match encoding {
Encoding::RLE => {
let i32_size = std::mem::size_of::<i32>();
- let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref()) as
usize;
- Ok((
- i32_size + data_size,
- buf.slice(i32_size..i32_size + data_size),
- ))
+ if i32_size <= buf.len() {
+ let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref())
as usize;
+ let end = i32_size
+ .checked_add(data_size)
+ .ok_or(general_err!("invalid level length"))?;
+ if end <= buf.len() {
+ return Ok((end, buf.slice(i32_size..end)));
+ }
+ }
+ Err(general_err!("not enough data to read levels"))
}
#[allow(deprecated)]
Encoding::BIT_PACKED => {
@@ -597,6 +602,25 @@ mod tests {
use crate::util::test_common::page_util::InMemoryPageReader;
use crate::util::test_common::rand_gen::make_pages;
+ #[test]
+ fn test_parse_v1_level_invalid_length() {
+ // Say length is 10, but buffer is only 4
+ let buf = Bytes::from(vec![10, 0, 0, 0]);
+ let err = parse_v1_level(1, 100, Encoding::RLE, buf).unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Parquet error: not enough data to read levels"
+ );
+
+ // Say length is 4, but buffer is only 3
+ let buf = Bytes::from(vec![4, 0, 0]);
+ let err = parse_v1_level(1, 100, Encoding::RLE, buf).unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Parquet error: not enough data to read levels"
+ );
+ }
+
const NUM_LEVELS: usize = 128;
const NUM_PAGES: usize = 2;
const MAX_DEF_LEVEL: i16 = 5;
diff --git a/parquet/src/encodings/decoding.rs
b/parquet/src/encodings/decoding.rs
index de8738cf09..f5336ca7c0 100644
--- a/parquet/src/encodings/decoding.rs
+++ b/parquet/src/encodings/decoding.rs
@@ -381,7 +381,17 @@ impl<T: DataType> DictDecoder<T> {
impl<T: DataType> Decoder<T> for DictDecoder<T> {
fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
// First byte in `data` is bit width
+ if data.is_empty() {
+ return Err(eof_err!("Not enough bytes to decode bit_width"));
+ }
+
let bit_width = data.as_ref()[0];
+ if bit_width > 32 {
+ return Err(general_err!(
+ "Invalid or corrupted RLE bit width {}. Max allowed is 32",
+ bit_width
+ ));
+ }
let mut rle_decoder = RleDecoder::new(bit_width);
rle_decoder.set_data(data.slice(1..));
self.num_values = num_values;
@@ -1395,6 +1405,13 @@ mod tests {
test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3,
6, 4, &[]);
}
+ #[test]
+ fn test_dict_decoder_empty_data() {
+ let mut decoder = DictDecoder::<Int32Type>::new();
+ let err = decoder.set_data(Bytes::new(), 10).unwrap_err();
+ assert_eq!(err.to_string(), "EOF: Not enough bytes to decode
bit_width");
+ }
+
fn test_plain_decode<T: DataType>(
data: Bytes,
num_values: usize,
diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs
index db8227fcac..41c0501320 100644
--- a/parquet/src/encodings/rle.rs
+++ b/parquet/src/encodings/rle.rs
@@ -513,7 +513,10 @@ impl RleDecoder {
self.rle_left = (indicator_value >> 1) as u32;
let value_width = bit_util::ceil(self.bit_width as usize, 8);
self.current_value =
bit_reader.get_aligned::<u64>(value_width);
- assert!(self.current_value.is_some());
+ assert!(
+ self.current_value.is_some(),
+ "parquet_data_error: not enough data for RLE decoding"
+ );
}
true
} else {
diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs
index 61af21a68e..3adf10fac2 100644
--- a/parquet/src/file/reader.rs
+++ b/parquet/src/file/reader.rs
@@ -124,11 +124,25 @@ impl ChunkReader for Bytes {
fn get_read(&self, start: u64) -> Result<Self::T> {
let start = start as usize;
+ if start > self.len() {
+ return Err(eof_err!(
+ "Expected to read at offset {start}, while file has length {}",
+ self.len()
+ ));
+ }
Ok(self.slice(start..).reader())
}
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let start = start as usize;
+ if start > self.len() || start + length > self.len() {
+ return Err(eof_err!(
+ "Expected to read {} bytes at offset {}, while file has length
{}",
+ length,
+ start,
+ self.len()
+ ));
+ }
Ok(self.slice(start..start + length))
}
}
@@ -274,3 +288,34 @@ impl Iterator for FilePageIterator {
}
impl PageIterator for FilePageIterator {}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_bytes_chunk_reader_get_read_out_of_bounds() {
+ let data = Bytes::from(vec![0, 1, 2, 3]);
+ let err = data.get_read(5).unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "EOF: Expected to read at offset 5, while file has length 4"
+ );
+ }
+
+ #[test]
+ fn test_bytes_chunk_reader_get_bytes_out_of_bounds() {
+ let data = Bytes::from(vec![0, 1, 2, 3]);
+ let err = data.get_bytes(5, 1).unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "EOF: Expected to read 1 bytes at offset 5, while file has length
4"
+ );
+
+ let err = data.get_bytes(2, 3).unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "EOF: Expected to read 3 bytes at offset 2, while file has length
4"
+ );
+ }
+}
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index 6da5c39d74..3f95ea9d49 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -392,6 +392,9 @@ pub(crate) fn decode_page(
let buffer = match decompressor {
Some(decompressor) if can_decompress => {
let uncompressed_page_size =
usize::try_from(page_header.uncompressed_page_size)?;
+ if offset > buffer.len() || offset > uncompressed_page_size {
+ return Err(general_err!("Invalid page header"));
+ }
let decompressed_size = uncompressed_page_size - offset;
let mut decompressed = Vec::with_capacity(uncompressed_page_size);
decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
@@ -458,7 +461,10 @@ pub(crate) fn decode_page(
}
_ => {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
- unimplemented!("Page type {:?} is not supported",
page_header.r#type)
+ return Err(general_err!(
+ "Page type {:?} is not supported",
+ page_header.r#type
+ ));
}
};
@@ -1130,6 +1136,7 @@ mod tests {
use crate::column::reader::ColumnReader;
use crate::data_type::private::ParquetValueType;
use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
+ use crate::file::metadata::thrift::DataPageHeaderV2;
#[allow(deprecated)]
use crate::file::page_index::index_reader::{read_columns_indexes,
read_offset_indexes};
use crate::file::writer::SerializedFileWriter;
@@ -1139,6 +1146,72 @@ mod tests {
use super::*;
+ #[test]
+ fn test_decode_page_invalid_offset() {
+ let page_header = PageHeader {
+ r#type: PageType::DATA_PAGE_V2,
+ uncompressed_page_size: 10,
+ compressed_page_size: 10,
+ data_page_header: None,
+ index_page_header: None,
+ dictionary_page_header: None,
+ crc: None,
+ data_page_header_v2: Some(DataPageHeaderV2 {
+ num_nulls: 0,
+ num_rows: 0,
+ num_values: 0,
+ encoding: Encoding::PLAIN,
+ definition_levels_byte_length: 11,
+ repetition_levels_byte_length: 0,
+ is_compressed: None,
+ statistics: None,
+ }),
+ };
+
+ let buffer = Bytes::new();
+ let err = decode_page(page_header, buffer, Type::INT32,
None).unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("DataPage v2 header contains implausible values")
+ );
+ }
+
+ #[test]
+ fn test_decode_unsupported_page() {
+ let mut page_header = PageHeader {
+ r#type: PageType::INDEX_PAGE,
+ uncompressed_page_size: 10,
+ compressed_page_size: 10,
+ data_page_header: None,
+ index_page_header: None,
+ dictionary_page_header: None,
+ crc: None,
+ data_page_header_v2: None,
+ };
+ let buffer = Bytes::new();
+ let err = decode_page(page_header.clone(), buffer.clone(),
Type::INT32, None).unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Parquet error: Page type INDEX_PAGE is not supported"
+ );
+
+ page_header.data_page_header_v2 = Some(DataPageHeaderV2 {
+ num_nulls: 0,
+ num_rows: 0,
+ num_values: 0,
+ encoding: Encoding::PLAIN,
+ definition_levels_byte_length: 11,
+ repetition_levels_byte_length: 0,
+ is_compressed: None,
+ statistics: None,
+ });
+ let err = decode_page(page_header, buffer, Type::INT32,
None).unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("DataPage v2 header contains implausible values")
+ );
+ }
+
#[test]
fn test_cursor_and_file_has_the_same_behaviour() {
let mut buf: Vec<u8> = Vec::new();
diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs
index 1ae37d0a46..de6f855685 100644
--- a/parquet/src/schema/types.rs
+++ b/parquet/src/schema/types.rs
@@ -1348,19 +1348,23 @@ fn schema_from_array_helper<'a>(
.with_logical_type(logical_type)
.with_fields(fields)
.with_id(field_id);
- if let Some(rep) = repetition {
- // Sometimes parquet-cpp and parquet-mr set repetition level
REQUIRED or
- // REPEATED for root node.
- //
- // We only set repetition for group types that are not
top-level message
- // type. According to parquet-format:
- // Root of the schema does not have a repetition_type.
- // All other types must have one.
- if !is_root_node {
- builder = builder.with_repetition(rep);
- }
+
+ // Sometimes parquet-cpp and parquet-mr set repetition level
REQUIRED or
+ // REPEATED for root node.
+ //
+ // We only set repetition for group types that are not top-level
message
+ // type. According to parquet-format:
+ // Root of the schema does not have a repetition_type.
+ // All other types must have one.
+ if !is_root_node {
+ let Some(rep) = repetition else {
+ return Err(general_err!(
+ "Repetition level must be defined for non-root types"
+ ));
+ };
+ builder = builder.with_repetition(rep);
}
- Ok((next_index, Arc::new(builder.build().unwrap())))
+ Ok((next_index, Arc::new(builder.build()?)))
}
}
}
diff --git a/parquet/tests/arrow_reader/bad_data.rs
b/parquet/tests/arrow_reader/bad_data.rs
index 235f818124..54c92976e4 100644
--- a/parquet/tests/arrow_reader/bad_data.rs
+++ b/parquet/tests/arrow_reader/bad_data.rs
@@ -84,10 +84,12 @@ fn test_parquet_1481() {
}
#[test]
-#[should_panic(expected = "assertion failed: self.current_value.is_some()")]
fn test_arrow_gh_41321() {
let err = read_file("ARROW-GH-41321.parquet").unwrap_err();
- assert_eq!(err.to_string(), "TBD (currently panics)");
+ assert_eq!(
+ err.to_string(),
+ "External: Parquet argument error: Parquet error: Invalid or corrupted
RLE bit width 254. Max allowed is 32"
+ );
}
#[test]