This is an automated email from the ASF dual-hosted git repository. etseidl pushed a commit to branch gh5854_thrift_remodel in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/gh5854_thrift_remodel by this push: new db16cb4d84 [thrift-remodel] Add custom `PageLocation` decoder to speed up decoding of page indexes (#8190) db16cb4d84 is described below commit db16cb4d840a9a28324662b3e1a800e097e2db1b Author: Ed Seidl <etse...@users.noreply.github.com> AuthorDate: Wed Aug 27 12:44:18 2025 -0700 [thrift-remodel] Add custom `PageLocation` decoder to speed up decoding of page indexes (#8190) # Which issue does this PR close? **Note: this targets a feature branch, not main** We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. - Part of #5854. # Rationale for this change Add a custom parser for `PageLocation` as the decoding of this struct is one of several hot spots. # What changes are included in this PR? This adds a faster means of obtaining the struct field ids to `ThriftCompactInputProtocol`. For a small struct (3 fields) with all of them required, we can save a good bit of time bypassing `ThriftCompactInputProtocol::read_field_begin` which is very general and can handle out-of-order fields, among other things. By adding a new function `read_field_header`, we can avoid the costly branching that occurs when calculating the new field id (as well as special handling needed for boolean fields). Field validation is then handled on the consuming side while decoding the `PageLocation` struct. Note that to obtain the speed up seen, we need to assume the fields will always be in order, and the field ids will all be encoded as field deltas. This is probably a fairly safe assumption, but there does exist the possibility of custom thrift writers that use absolute field ids. If we encounter such a writer in the wild, this change will need to be reverted. # Are these changes tested? These changes should be covered by existing changes. # Are there any user-facing changes? None beyond the changes in this branch. --- parquet/src/file/page_index/index_reader.rs | 11 +++- parquet/src/file/page_index/offset_index.rs | 88 +++++++++++++++++++++++++++++ parquet/src/parquet_thrift.rs | 13 +++++ parquet/tests/arrow_reader/io/mod.rs | 5 ++ 4 files changed, 116 insertions(+), 1 deletion(-) diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index f35241689e..99e5963b29 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -133,7 +133,16 @@ pub fn read_offset_indexes<R: ChunkReader>( pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData, ParquetError> { let mut prot = ThriftCompactInputProtocol::new(data); - OffsetIndexMetaData::try_from(&mut prot) + + // Try to read fast-path first. If that fails, fall back to slower but more robust + // decoder. + match OffsetIndexMetaData::try_from_fast(&mut prot) { + Ok(offset_index) => Ok(offset_index), + Err(_) => { + prot = ThriftCompactInputProtocol::new(data); + OffsetIndexMetaData::try_from(&mut prot) + } + } } // private struct only used for decoding then discarded diff --git a/parquet/src/file/page_index/offset_index.rs b/parquet/src/file/page_index/offset_index.rs index d4c196a3ae..6cb7539cb5 100644 --- a/parquet/src/file/page_index/offset_index.rs +++ b/parquet/src/file/page_index/offset_index.rs @@ -104,4 +104,92 @@ impl OffsetIndexMetaData { self.unencoded_byte_array_data_bytes.clone(), ) } + + // Fast-path read of offset index. This works because we expect all field deltas to be 1, + // and there's no nesting beyond PageLocation, so no need to save the last field id. Like + // read_page_locations(), this will fail if absolute field id's are used. + pub(super) fn try_from_fast<'a>(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> { + // Offset index is a struct with 2 fields. First field is an array of PageLocations, + // the second an optional array of i64. + + // read field 1 header, then list header, then vec of PageLocations + let (field_type, delta) = prot.read_field_header()?; + if delta != 1 || field_type != FieldType::List as u8 { + return Err(general_err!("error reading OffsetIndex::page_locations")); + } + + // we have to do this manually because we want to use the fast PageLocation decoder + let list_ident = prot.read_list_begin()?; + let mut page_locations = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + page_locations.push(read_page_location(prot)?); + } + + let mut unencoded_byte_array_data_bytes: Option<Vec<i64>> = None; + + // read second field...if it's Stop we're done + let (mut field_type, delta) = prot.read_field_header()?; + if field_type == FieldType::List as u8 { + if delta != 1 { + return Err(general_err!( + "encountered unknown field while reading OffsetIndex" + )); + } + let vec = Vec::<i64>::try_from(&mut *prot)?; + unencoded_byte_array_data_bytes = Some(vec); + + // this one should be Stop + (field_type, _) = prot.read_field_header()?; + } + + if field_type != FieldType::Stop as u8 { + return Err(general_err!( + "encountered unknown field while reading OffsetIndex" + )); + } + + Ok(Self { + page_locations, + unencoded_byte_array_data_bytes, + }) + } +} + +// hand coding this one because it is very time critical + +// Note: this will fail if the fields are either out of order, or if a suboptimal +// encoder doesn't use field deltas. +fn read_page_location<'a>(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<PageLocation> { + // there are 3 fields, all mandatory, so all field deltas should be 1 + let (field_type, delta) = prot.read_field_header()?; + if delta != 1 || field_type != FieldType::I64 as u8 { + return Err(general_err!("error reading PageLocation::offset")); + } + let offset = prot.read_i64()?; + + let (field_type, delta) = prot.read_field_header()?; + if delta != 1 || field_type != FieldType::I32 as u8 { + return Err(general_err!( + "error reading PageLocation::compressed_page_size" + )); + } + let compressed_page_size = prot.read_i32()?; + + let (field_type, delta) = prot.read_field_header()?; + if delta != 1 || field_type != FieldType::I64 as u8 { + return Err(general_err!("error reading PageLocation::first_row_index")); + } + let first_row_index = prot.read_i64()?; + + // read end of struct...return error if there are unknown fields present + let (field_type, _) = prot.read_field_header()?; + if field_type != FieldType::Stop as u8 { + return Err(general_err!("unexpected field in PageLocation")); + } + + Ok(PageLocation { + offset, + compressed_page_size, + first_row_index, + }) } diff --git a/parquet/src/parquet_thrift.rs b/parquet/src/parquet_thrift.rs index 7f5fe47521..2dff498372 100644 --- a/parquet/src/parquet_thrift.rs +++ b/parquet/src/parquet_thrift.rs @@ -244,6 +244,19 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> { Ok(()) } + // This is a specialized version of read_field_begin, solely for use in parsing + // PageLocation structs in the offset index. This function assumes that the delta + // field will always be less than 0xf, fields will be in order, and no boolean fields + // will be read. This also skips validation of the field type. + // + // Returns a tuple of (field_type, field_delta) + pub(crate) fn read_field_header(&mut self) -> Result<(u8, u8)> { + let field_type = self.read_byte()?; + let field_delta = (field_type & 0xf0) >> 4; + let field_type = field_type & 0xf; + Ok((field_type, field_delta)) + } + pub(crate) fn read_field_begin(&mut self) -> Result<FieldIdentifier> { // we can read at least one byte, which is: // - the type diff --git a/parquet/tests/arrow_reader/io/mod.rs b/parquet/tests/arrow_reader/io/mod.rs index 9cafcd714e..bfdb9467e2 100644 --- a/parquet/tests/arrow_reader/io/mod.rs +++ b/parquet/tests/arrow_reader/io/mod.rs @@ -298,6 +298,11 @@ impl TestRowGroups { let start_offset = start_offset as usize; let end_offset = start_offset + length as usize; + let page_locations = page_locations + .iter() + .map(parquet::format::PageLocation::from) + .collect(); + TestColumnChunk { name: column_name.clone(), location: start_offset..end_offset,