This is an automated email from the ASF dual-hosted git repository.

etseidl pushed a commit to branch gh5854_thrift_remodel
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/gh5854_thrift_remodel by this 
push:
     new db16cb4d84 [thrift-remodel] Add custom `PageLocation` decoder to speed 
up decoding of page indexes (#8190)
db16cb4d84 is described below

commit db16cb4d840a9a28324662b3e1a800e097e2db1b
Author: Ed Seidl <etse...@users.noreply.github.com>
AuthorDate: Wed Aug 27 12:44:18 2025 -0700

    [thrift-remodel] Add custom `PageLocation` decoder to speed up decoding of 
page indexes (#8190)
    
    # Which issue does this PR close?
    **Note: this targets a feature branch, not main**
    
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax.
    
    - Part of #5854.
    
    # Rationale for this change
    
    Add a custom parser for `PageLocation` as the decoding of this struct is
    one of several hot spots.
    
    # What changes are included in this PR?
    
    This adds a faster means of obtaining the struct field ids to
    `ThriftCompactInputProtocol`. For a small struct (3 fields) with all of
    them required, we can save a good bit of time bypassing
    `ThriftCompactInputProtocol::read_field_begin` which is very general and
    can handle out-of-order fields, among other things. By adding a new
    function `read_field_header`, we can avoid the costly branching that
    occurs when calculating the new field id (as well as special handling
    needed for boolean fields). Field validation is then handled on the
    consuming side while decoding the `PageLocation` struct.
    
    Note that to obtain the speed up seen, we need to assume the fields will
    always be in order, and the field ids will all be encoded as field
    deltas. This is probably a fairly safe assumption, but there does exist
    the possibility of custom thrift writers that use absolute field ids. If
    we encounter such a writer in the wild, this change will need to be
    reverted.
    
    # Are these changes tested?
    
    These changes should be covered by existing changes.
    
    # Are there any user-facing changes?
    
    None beyond the changes in this branch.
---
 parquet/src/file/page_index/index_reader.rs | 11 +++-
 parquet/src/file/page_index/offset_index.rs | 88 +++++++++++++++++++++++++++++
 parquet/src/parquet_thrift.rs               | 13 +++++
 parquet/tests/arrow_reader/io/mod.rs        |  5 ++
 4 files changed, 116 insertions(+), 1 deletion(-)

diff --git a/parquet/src/file/page_index/index_reader.rs 
b/parquet/src/file/page_index/index_reader.rs
index f35241689e..99e5963b29 100644
--- a/parquet/src/file/page_index/index_reader.rs
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -133,7 +133,16 @@ pub fn read_offset_indexes<R: ChunkReader>(
 
 pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData, 
ParquetError> {
     let mut prot = ThriftCompactInputProtocol::new(data);
-    OffsetIndexMetaData::try_from(&mut prot)
+
+    // Try to read fast-path first. If that fails, fall back to slower but 
more robust
+    // decoder.
+    match OffsetIndexMetaData::try_from_fast(&mut prot) {
+        Ok(offset_index) => Ok(offset_index),
+        Err(_) => {
+            prot = ThriftCompactInputProtocol::new(data);
+            OffsetIndexMetaData::try_from(&mut prot)
+        }
+    }
 }
 
 // private struct only used for decoding then discarded
diff --git a/parquet/src/file/page_index/offset_index.rs 
b/parquet/src/file/page_index/offset_index.rs
index d4c196a3ae..6cb7539cb5 100644
--- a/parquet/src/file/page_index/offset_index.rs
+++ b/parquet/src/file/page_index/offset_index.rs
@@ -104,4 +104,92 @@ impl OffsetIndexMetaData {
             self.unencoded_byte_array_data_bytes.clone(),
         )
     }
+
+    // Fast-path read of offset index. This works because we expect all field 
deltas to be 1,
+    // and there's no nesting beyond PageLocation, so no need to save the last 
field id. Like
+    // read_page_locations(), this will fail if absolute field id's are used.
+    pub(super) fn try_from_fast<'a>(prot: &mut ThriftCompactInputProtocol<'a>) 
-> Result<Self> {
+        // Offset index is a struct with 2 fields. First field is an array of 
PageLocations,
+        // the second an optional array of i64.
+
+        // read field 1 header, then list header, then vec of PageLocations
+        let (field_type, delta) = prot.read_field_header()?;
+        if delta != 1 || field_type != FieldType::List as u8 {
+            return Err(general_err!("error reading 
OffsetIndex::page_locations"));
+        }
+
+        // we have to do this manually because we want to use the fast 
PageLocation decoder
+        let list_ident = prot.read_list_begin()?;
+        let mut page_locations = Vec::with_capacity(list_ident.size as usize);
+        for _ in 0..list_ident.size {
+            page_locations.push(read_page_location(prot)?);
+        }
+
+        let mut unencoded_byte_array_data_bytes: Option<Vec<i64>> = None;
+
+        // read second field...if it's Stop we're done
+        let (mut field_type, delta) = prot.read_field_header()?;
+        if field_type == FieldType::List as u8 {
+            if delta != 1 {
+                return Err(general_err!(
+                    "encountered unknown field while reading OffsetIndex"
+                ));
+            }
+            let vec = Vec::<i64>::try_from(&mut *prot)?;
+            unencoded_byte_array_data_bytes = Some(vec);
+
+            // this one should be Stop
+            (field_type, _) = prot.read_field_header()?;
+        }
+
+        if field_type != FieldType::Stop as u8 {
+            return Err(general_err!(
+                "encountered unknown field while reading OffsetIndex"
+            ));
+        }
+
+        Ok(Self {
+            page_locations,
+            unencoded_byte_array_data_bytes,
+        })
+    }
+}
+
+// hand coding this one because it is very time critical
+
+// Note: this will fail if the fields are either out of order, or if a 
suboptimal
+// encoder doesn't use field deltas.
+fn read_page_location<'a>(prot: &mut ThriftCompactInputProtocol<'a>) -> 
Result<PageLocation> {
+    // there are 3 fields, all mandatory, so all field deltas should be 1
+    let (field_type, delta) = prot.read_field_header()?;
+    if delta != 1 || field_type != FieldType::I64 as u8 {
+        return Err(general_err!("error reading PageLocation::offset"));
+    }
+    let offset = prot.read_i64()?;
+
+    let (field_type, delta) = prot.read_field_header()?;
+    if delta != 1 || field_type != FieldType::I32 as u8 {
+        return Err(general_err!(
+            "error reading PageLocation::compressed_page_size"
+        ));
+    }
+    let compressed_page_size = prot.read_i32()?;
+
+    let (field_type, delta) = prot.read_field_header()?;
+    if delta != 1 || field_type != FieldType::I64 as u8 {
+        return Err(general_err!("error reading 
PageLocation::first_row_index"));
+    }
+    let first_row_index = prot.read_i64()?;
+
+    // read end of struct...return error if there are unknown fields present
+    let (field_type, _) = prot.read_field_header()?;
+    if field_type != FieldType::Stop as u8 {
+        return Err(general_err!("unexpected field in PageLocation"));
+    }
+
+    Ok(PageLocation {
+        offset,
+        compressed_page_size,
+        first_row_index,
+    })
 }
diff --git a/parquet/src/parquet_thrift.rs b/parquet/src/parquet_thrift.rs
index 7f5fe47521..2dff498372 100644
--- a/parquet/src/parquet_thrift.rs
+++ b/parquet/src/parquet_thrift.rs
@@ -244,6 +244,19 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> {
         Ok(())
     }
 
+    // This is a specialized version of read_field_begin, solely for use in 
parsing
+    // PageLocation structs in the offset index. This function assumes that 
the delta
+    // field will always be less than 0xf, fields will be in order, and no 
boolean fields
+    // will be read. This also skips validation of the field type.
+    //
+    // Returns a tuple of (field_type, field_delta)
+    pub(crate) fn read_field_header(&mut self) -> Result<(u8, u8)> {
+        let field_type = self.read_byte()?;
+        let field_delta = (field_type & 0xf0) >> 4;
+        let field_type = field_type & 0xf;
+        Ok((field_type, field_delta))
+    }
+
     pub(crate) fn read_field_begin(&mut self) -> Result<FieldIdentifier> {
         // we can read at least one byte, which is:
         // - the type
diff --git a/parquet/tests/arrow_reader/io/mod.rs 
b/parquet/tests/arrow_reader/io/mod.rs
index 9cafcd714e..bfdb9467e2 100644
--- a/parquet/tests/arrow_reader/io/mod.rs
+++ b/parquet/tests/arrow_reader/io/mod.rs
@@ -298,6 +298,11 @@ impl TestRowGroups {
                         let start_offset = start_offset as usize;
                         let end_offset = start_offset + length as usize;
 
+                        let page_locations = page_locations
+                            .iter()
+                            .map(parquet::format::PageLocation::from)
+                            .collect();
+
                         TestColumnChunk {
                             name: column_name.clone(),
                             location: start_offset..end_offset,

Reply via email to