This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 8eca76df29 Move ParquetMetadata decoder state machine into 
ParquetMetadataPushDecoder (#8340)
8eca76df29 is described below

commit 8eca76df2986908bcfa175fa6b16c52d1019a874
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Sep 30 03:07:42 2025 -0700

    Move ParquetMetadata decoder state machine into ParquetMetadataPushDecoder 
(#8340)
    
    # Which issue does this PR close?
    
    - part of #8000
    - Follow on to https://github.com/apache/arrow-rs/pull/8080
    - Closes https://github.com/apache/arrow-rs/issues/8439
    
    # Rationale for this change
    
    The current ParquetMetadataDecoder intermixes three things:
    1. The state machine for decoding parquet metadata (footer, then
    metadata, then (optional) indexes)
    2. orchestrating IO (aka calling read, etc)
    3. Decoding thrift encoded byte into objects
    
    This makes it almost impossible to add features like "only decode a
    subset of the columns in the ColumnIndex" and other potentially advanced
    usecases
    
    Now that we have a "push" style API for metadata decoding that avoids
    IO, the next step is to extract out the actual work into this API so
    that the existing ParquetMetadataDecoder just calls into the PushDecoder
    
    # What changes are included in this PR?
    
    1. Extract decoding  state machine into PushMetadataDecoder
    2. Extract thrift parsing into its own `parser`  module
    3. Update ParquetMetadataDecoder to use the PushMetadataDecoder
    4. Extract the bytes --> object code into its own module
    
    This almost certainly will conflict with @etseidl 's plans in
    thrift-remodel.
    
    # Are these changes tested?
    by existing tests
    
    # Are there any user-facing changes?
    
    Not really -- this is an internal change that will make it easier to add
    features like "only decode a subset of the columns in the ColumnIndex,
    for example
---
 parquet/src/file/metadata/parser.rs       |  84 ++++++++-
 parquet/src/file/metadata/push_decoder.rs | 297 ++++++++++++++++++++++--------
 parquet/src/file/metadata/reader.rs       | 262 ++++++++++++++++----------
 parquet/tests/arrow_reader/io/mod.rs      |   4 +-
 4 files changed, 472 insertions(+), 175 deletions(-)

diff --git a/parquet/src/file/metadata/parser.rs 
b/parquet/src/file/metadata/parser.rs
index a68f14d4d7..2a297e2273 100644
--- a/parquet/src/file/metadata/parser.rs
+++ b/parquet/src/file/metadata/parser.rs
@@ -43,6 +43,86 @@ use crate::encryption::{
 #[cfg(feature = "encryption")]
 use crate::format::EncryptionAlgorithm;
 
+/// Helper struct for metadata parsing
+///
+/// This structure parses thrift-encoded bytes into the correct Rust structs,
+/// such as [`ParquetMetaData`], handling decryption if necessary.
+//
+// Note this structure is used to minimize the number of
+// places need to add `#[cfg(feature = "encryption")]` checks.
+pub(crate) use inner::MetadataParser;
+
+#[cfg(feature = "encryption")]
+mod inner {
+    use super::*;
+    use crate::encryption::decrypt::FileDecryptionProperties;
+    use crate::errors::Result;
+
+    /// API for decoding metadata that may be encrypted
+    #[derive(Debug, Default)]
+    pub(crate) struct MetadataParser {
+        // the credentials and keys needed to decrypt metadata
+        file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
+    }
+
+    impl MetadataParser {
+        pub(crate) fn new() -> Self {
+            MetadataParser::default()
+        }
+
+        pub(crate) fn with_file_decryption_properties(
+            mut self,
+            file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
+        ) -> Self {
+            self.file_decryption_properties = file_decryption_properties;
+            self
+        }
+
+        pub(crate) fn decode_metadata(
+            &self,
+            buf: &[u8],
+            encrypted_footer: bool,
+        ) -> Result<ParquetMetaData> {
+            decode_metadata_with_encryption(
+                buf,
+                encrypted_footer,
+                self.file_decryption_properties.as_deref(),
+            )
+        }
+    }
+}
+
+#[cfg(not(feature = "encryption"))]
+mod inner {
+    use super::*;
+    use crate::errors::Result;
+    /// parallel implementation when encryption feature is not enabled
+    ///
+    /// This has the same API as the encryption-enabled version
+    #[derive(Debug, Default)]
+    pub(crate) struct MetadataParser;
+
+    impl MetadataParser {
+        pub(crate) fn new() -> Self {
+            MetadataParser
+        }
+
+        pub(crate) fn decode_metadata(
+            &self,
+            buf: &[u8],
+            encrypted_footer: bool,
+        ) -> Result<ParquetMetaData> {
+            if encrypted_footer {
+                Err(general_err!(
+                    "Parquet file has an encrypted footer but the encryption 
feature is disabled"
+                ))
+            } else {
+                decode_metadata(buf)
+            }
+        }
+    }
+}
+
 /// Decodes [`ParquetMetaData`] from the provided bytes.
 ///
 /// Typically this is used to decode the metadata from the end of a parquet
@@ -79,7 +159,7 @@ pub(crate) fn decode_metadata(buf: &[u8]) -> 
crate::errors::Result<ParquetMetaDa
 
 /// Parses column orders from Thrift definition.
 /// If no column orders are defined, returns `None`.
-pub(crate) fn parse_column_orders(
+fn parse_column_orders(
     t_column_orders: Option<Vec<crate::format::ColumnOrder>>,
     schema_descr: &SchemaDescriptor,
 ) -> crate::errors::Result<Option<Vec<ColumnOrder>>> {
@@ -288,7 +368,7 @@ fn parse_single_offset_index(
 /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
 /// [Parquet Encryption Spec]: 
https://parquet.apache.org/docs/file-format/data-pages/encryption/
 #[cfg(feature = "encryption")]
-pub(crate) fn decode_metadata_with_encryption(
+fn decode_metadata_with_encryption(
     buf: &[u8],
     encrypted_footer: bool,
     file_decryption_properties: Option<&FileDecryptionProperties>,
diff --git a/parquet/src/file/metadata/push_decoder.rs 
b/parquet/src/file/metadata/push_decoder.rs
index 811caf4fd4..2e55146e56 100644
--- a/parquet/src/file/metadata/push_decoder.rs
+++ b/parquet/src/file/metadata/push_decoder.rs
@@ -15,23 +15,37 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::errors::ParquetError;
-use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, 
ParquetMetaDataReader};
+#[cfg(feature = "encryption")]
+use crate::encryption::decrypt::FileDecryptionProperties;
+use crate::errors::{ParquetError, Result};
+use crate::file::metadata::parser::{parse_column_index, parse_offset_index, 
MetadataParser};
+use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData};
+use crate::file::page_index::index_reader::acc_range;
+use crate::file::reader::ChunkReader;
+use crate::file::FOOTER_SIZE;
 use crate::DecodeResult;
+use bytes::Bytes;
+use std::ops::Range;
 
 /// A push decoder for [`ParquetMetaData`].
 ///
-/// This structure implements a push API based version of the 
[`ParquetMetaDataReader`], which
-/// decouples the IO from the metadata decoding logic.
+/// This structure implements a push API for decoding Parquet metadata, which
+/// decouples IO from the metadata decoding logic (sometimes referred to as
+/// [Sans-IO]).
 ///
-/// You can use this decoder to customize your IO operations, as shown in the
-/// examples below for minimizing bytes read, prefetching data, or
-/// using async IO.
+/// See [`ParquetMetaDataReader`] for a pull-based API that incorporates IO and
+/// is simpler to use for basic use cases. This decoder is best for customizing
+/// your IO operations to minimize bytes read, prefetch data, or use async IO.
+///
+/// [Sans-IO]: https://sans-io.readthedocs.io
+/// [`ParquetMetaDataReader`]: crate::file::metadata::ParquetMetaDataReader
 ///
 /// # Example
 ///
 /// The most basic usage is to feed the decoder with the necessary byte ranges
-/// as requested as shown below.
+/// as requested as shown below. This minimizes the number of bytes read, but
+/// requires the most IO operations - one to read the footer and then one
+/// to read the metadata, and possibly more if page indexes are requested.
 ///
 /// ```rust
 /// # use std::ops::Range;
@@ -192,9 +206,16 @@ use crate::DecodeResult;
 /// [`AsyncRead`]: tokio::io::AsyncRead
 #[derive(Debug)]
 pub struct ParquetMetaDataPushDecoder {
-    done: bool,
-    metadata_reader: ParquetMetaDataReader,
+    /// Decoding state
+    state: DecodeState,
+    /// policy for loading ColumnIndex (part of the PageIndex)
+    column_index_policy: PageIndexPolicy,
+    /// policy for loading OffsetIndex (part of the PageIndex)
+    offset_index_policy: PageIndexPolicy,
+    /// Underlying buffers
     buffers: crate::util::push_buffers::PushBuffers,
+    /// Encryption API
+    metadata_parser: MetadataParser,
 }
 
 impl ParquetMetaDataPushDecoder {
@@ -204,23 +225,39 @@ impl ParquetMetaDataPushDecoder {
     /// [`ParquetMetaDataPushDecoder::with_page_index_policy`] for more detail.
     ///
     /// See examples on [`ParquetMetaDataPushDecoder`].
-    pub fn try_new(file_len: u64) -> Result<Self, ParquetError> {
+    pub fn try_new(file_len: u64) -> Result<Self> {
         if file_len < 8 {
             return Err(ParquetError::General(format!(
                 "Parquet files are at least 8 bytes long, but file length is 
{file_len}"
             )));
         };
 
-        let metadata_reader =
-            
ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Optional);
-
         Ok(Self {
-            done: false,
-            metadata_reader,
+            state: DecodeState::ReadingFooter,
+            column_index_policy: PageIndexPolicy::Optional,
+            offset_index_policy: PageIndexPolicy::Optional,
             buffers: crate::util::push_buffers::PushBuffers::new(file_len),
+            metadata_parser: MetadataParser::new(),
         })
     }
 
+    /// Begin decoding from the given footer tail.
+    pub(crate) fn try_new_with_footer_tail(file_len: u64, footer_tail: 
FooterTail) -> Result<Self> {
+        let mut new_self = Self::try_new(file_len)?;
+        new_self.state = DecodeState::ReadingMetadata(footer_tail);
+        Ok(new_self)
+    }
+
+    /// Create a decoder with the given `ParquetMetaData` already known.
+    ///
+    /// This can be used to parse and populate the page index structures
+    /// after the metadata has already been decoded.
+    pub fn try_new_with_metadata(file_len: u64, metadata: ParquetMetaData) -> 
Result<Self> {
+        let mut new_self = Self::try_new(file_len)?;
+        new_self.state = DecodeState::ReadingPageIndex(Box::new(metadata));
+        Ok(new_self)
+    }
+
     /// Enable or disable reading the page index structures described in
     /// "[Parquet page index] Layout to Support Page Skipping".
     ///
@@ -232,9 +269,32 @@ impl ParquetMetaDataPushDecoder {
     ///
     /// [Parquet page index]: 
https://github.com/apache/parquet-format/blob/master/PageIndex.md
     pub fn with_page_index_policy(mut self, page_index_policy: 
PageIndexPolicy) -> Self {
-        self.metadata_reader = self
-            .metadata_reader
-            .with_page_index_policy(page_index_policy);
+        self.column_index_policy = page_index_policy;
+        self.offset_index_policy = page_index_policy;
+        self
+    }
+
+    /// Set the policy for reading the ColumnIndex (part of the PageIndex)
+    pub fn with_column_index_policy(mut self, column_index_policy: 
PageIndexPolicy) -> Self {
+        self.column_index_policy = column_index_policy;
+        self
+    }
+
+    /// Set the policy for reading the OffsetIndex (part of the PageIndex)
+    pub fn with_offset_index_policy(mut self, offset_index_policy: 
PageIndexPolicy) -> Self {
+        self.offset_index_policy = offset_index_policy;
+        self
+    }
+
+    #[cfg(feature = "encryption")]
+    /// Provide decryption properties for decoding encrypted Parquet files
+    pub(crate) fn with_file_decryption_properties(
+        mut self,
+        file_decryption_properties: 
Option<std::sync::Arc<FileDecryptionProperties>>,
+    ) -> Self {
+        self.metadata_parser = self
+            .metadata_parser
+            .with_file_decryption_properties(file_decryption_properties);
         self
     }
 
@@ -257,72 +317,161 @@ impl ParquetMetaDataPushDecoder {
     ///
     /// Speculatively pushing data can be used when  "prefetching" data. See
     /// example on [`Self`]
-    pub fn push_ranges(
-        &mut self,
-        ranges: Vec<std::ops::Range<u64>>,
-        buffers: Vec<bytes::Bytes>,
-    ) -> std::result::Result<(), String> {
-        if self.done {
-            return Err(
+    pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers: 
Vec<Bytes>) -> Result<()> {
+        if matches!(&self.state, DecodeState::Finished) {
+            return Err(general_err!(
                 "ParquetMetaDataPushDecoder: cannot push data after decoding 
is finished"
-                    .to_string(),
-            );
+            ));
         }
         self.buffers.push_ranges(ranges, buffers);
         Ok(())
     }
 
-    /// Try to decode the metadata from the pushed data, returning the
-    /// decoded metadata or an error if not enough data is available.
-    pub fn try_decode(
-        &mut self,
-    ) -> std::result::Result<DecodeResult<ParquetMetaData>, ParquetError> {
-        if self.done {
-            return Ok(DecodeResult::Finished);
+    /// Pushes a single range of data into the decoder's buffer.
+    pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) -> 
Result<()> {
+        if matches!(&self.state, DecodeState::Finished) {
+            return Err(general_err!(
+                "ParquetMetaDataPushDecoder: cannot push data after decoding 
is finished"
+            ));
         }
+        self.buffers.push_range(range, buffer);
+        Ok(())
+    }
 
-        // need to have the last 8 bytes of the file to decode the metadata
+    /// Try to decode the metadata from the pushed data, returning the
+    /// decoded metadata or an error if not enough data is available.
+    pub fn try_decode(&mut self) -> Result<DecodeResult<ParquetMetaData>> {
         let file_len = self.buffers.file_len();
-        if !self.buffers.has_range(&(file_len - 8..file_len)) {
-            #[expect(clippy::single_range_in_vec_init)]
-            return Ok(DecodeResult::NeedsData(vec![file_len - 8..file_len]));
+        let footer_len = FOOTER_SIZE as u64;
+        loop {
+            match std::mem::replace(&mut self.state, 
DecodeState::Intermediate) {
+                DecodeState::ReadingFooter => {
+                    // need to have the last 8 bytes of the file to decode the 
metadata
+                    let footer_start = file_len.saturating_sub(footer_len);
+                    let footer_range = footer_start..file_len;
+
+                    if !self.buffers.has_range(&footer_range) {
+                        self.state = DecodeState::ReadingFooter;
+                        return Ok(needs_range(footer_range));
+                    }
+                    let footer_bytes = self.get_bytes(&footer_range)?;
+                    let footer_tail = 
FooterTail::try_from(footer_bytes.as_ref())?;
+
+                    self.state = DecodeState::ReadingMetadata(footer_tail);
+                    continue;
+                }
+
+                DecodeState::ReadingMetadata(footer_tail) => {
+                    let metadata_len: u64 = footer_tail.metadata_length() as 
u64;
+                    let metadata_start = file_len - footer_len - metadata_len;
+                    let metadata_end = metadata_start + metadata_len;
+                    let metadata_range = metadata_start..metadata_end;
+
+                    if !self.buffers.has_range(&metadata_range) {
+                        self.state = DecodeState::ReadingMetadata(footer_tail);
+                        return Ok(needs_range(metadata_range));
+                    }
+
+                    let metadata = self.metadata_parser.decode_metadata(
+                        &self.get_bytes(&metadata_range)?,
+                        footer_tail.is_encrypted_footer(),
+                    )?;
+                    // Note: ReadingPageIndex first checks if page indexes are 
needed
+                    // and is a no-op if not
+                    self.state = 
DecodeState::ReadingPageIndex(Box::new(metadata));
+                    continue;
+                }
+
+                DecodeState::ReadingPageIndex(mut metadata) => {
+                    // First determine if any page indexes are needed based on
+                    // the specified policies
+                    let range = range_for_page_index(
+                        &metadata,
+                        self.column_index_policy,
+                        self.offset_index_policy,
+                    );
+
+                    let Some(page_index_range) = range else {
+                        self.state = DecodeState::Finished;
+                        return Ok(DecodeResult::Data(*metadata));
+                    };
+
+                    if !self.buffers.has_range(&page_index_range) {
+                        self.state = DecodeState::ReadingPageIndex(metadata);
+                        return Ok(needs_range(page_index_range));
+                    }
+
+                    let buffer = self.get_bytes(&page_index_range)?;
+                    let offset = page_index_range.start;
+                    parse_column_index(&mut metadata, 
self.column_index_policy, &buffer, offset)?;
+                    parse_offset_index(&mut metadata, 
self.offset_index_policy, &buffer, offset)?;
+                    self.state = DecodeState::Finished;
+                    return Ok(DecodeResult::Data(*metadata));
+                }
+
+                DecodeState::Finished => return Ok(DecodeResult::Finished),
+                DecodeState::Intermediate => {
+                    return Err(general_err!(
+                        "ParquetMetaDataPushDecoder: internal error, invalid 
state"
+                    ));
+                }
+            }
         }
+    }
 
-        // Try to parse the metadata from the buffers we have.
-        //
-        // If we don't have enough data, returns a `ParquetError::NeedMoreData`
-        // with the number of bytes needed to complete the metadata parsing.
-        //
-        // If we have enough data, returns `Ok(())` and we can complete
-        // the metadata parsing.
-        let maybe_metadata = self
-            .metadata_reader
-            .try_parse_sized(&self.buffers, self.buffers.file_len());
-
-        match maybe_metadata {
-            Ok(()) => {
-                // Metadata successfully parsed, proceed to decode the row 
groups
-                let metadata = self.metadata_reader.finish()?;
-                self.done = true;
-                Ok(DecodeResult::Data(metadata))
-            }
+    /// Returns the bytes for the given range from the internal buffer
+    fn get_bytes(&self, range: &Range<u64>) -> Result<Bytes> {
+        let start = range.start;
+        let raw_len = range.end - range.start;
+        let len: usize = raw_len.try_into().map_err(|_| {
+            ParquetError::General(format!(
+                "ParquetMetaDataPushDecoder: Range length too large to fit in 
usize: {raw_len}",
+            ))
+        })?;
+        self.buffers.get_bytes(start, len)
+    }
+}
 
-            Err(ParquetError::NeedMoreData(needed)) => {
-                let needed = needed as u64;
-                let Some(start_offset) = file_len.checked_sub(needed) else {
-                    return Err(ParquetError::General(format!(
-                        "Parquet metadata reader needs at least {needed} 
bytes, but file length is only {file_len}"
-                    )));
-                };
-                let needed_range = start_offset..start_offset + needed;
-                // needs `needed_range` bytes at the end of the file
-                Ok(DecodeResult::NeedsData(vec![needed_range]))
-            }
-            Err(ParquetError::NeedMoreDataRange(range)) => 
Ok(DecodeResult::NeedsData(vec![range])),
+/// returns a DecodeResults that describes needing the given range
+fn needs_range(range: Range<u64>) -> DecodeResult<ParquetMetaData> {
+    DecodeResult::NeedsData(vec![range])
+}
+
+/// Decoding state machine
+#[derive(Debug)]
+enum DecodeState {
+    /// Reading the last 8 bytes of the file
+    ReadingFooter,
+    /// Reading the metadata thrift structure
+    ReadingMetadata(FooterTail),
+    // Actively reading the page index
+    ReadingPageIndex(Box<ParquetMetaData>),
+    // Decoding is complete
+    Finished,
+    /// State left during the `try_decode` method so something valid is 
present.
+    /// This state should never be observed.
+    Intermediate,
+}
 
-            Err(e) => Err(e), // some other error, pass back
+/// Returns the byte range needed to read the offset/page indexes, based on the
+/// specified policies
+///
+/// Returns None if no page indexes are needed
+pub fn range_for_page_index(
+    metadata: &ParquetMetaData,
+    column_index_policy: PageIndexPolicy,
+    offset_index_policy: PageIndexPolicy,
+) -> Option<Range<u64>> {
+    let mut range = None;
+    for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
+        if column_index_policy != PageIndexPolicy::Skip {
+            range = acc_range(range, c.column_index_range());
+        }
+        if offset_index_policy != PageIndexPolicy::Skip {
+            range = acc_range(range, c.offset_index_range());
         }
     }
+    range
 }
 
 // These tests use the arrow writer to create a parquet file in memory
@@ -533,7 +682,7 @@ mod tests {
     }
 
     /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and 
return the corresponding element
-    fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) -> 
T {
+    fn expect_data<T: Debug>(result: Result<DecodeResult<T>>) -> T {
         match result.expect("Expected Ok(DecodeResult::Data(T))") {
             DecodeResult::Data(data) => data,
             result => panic!("Expected DecodeResult::Data, got {result:?}"),
@@ -541,16 +690,14 @@ mod tests {
     }
 
     /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and 
return the corresponding ranges
-    fn expect_needs_data<T: Debug>(
-        result: Result<DecodeResult<T>, ParquetError>,
-    ) -> Vec<Range<u64>> {
+    fn expect_needs_data<T: Debug>(result: Result<DecodeResult<T>>) -> 
Vec<Range<u64>> {
         match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
             DecodeResult::NeedsData(ranges) => ranges,
             result => panic!("Expected DecodeResult::NeedsData, got 
{result:?}"),
         }
     }
 
-    fn expect_finished<T: Debug>(result: Result<DecodeResult<T>, 
ParquetError>) {
+    fn expect_finished<T: Debug>(result: Result<DecodeResult<T>>) {
         match result.expect("Expected Ok(DecodeResult::Finished)") {
             DecodeResult::Finished => {}
             result => panic!("Expected DecodeResult::Finished, got 
{result:?}"),
diff --git a/parquet/src/file/metadata/reader.rs 
b/parquet/src/file/metadata/reader.rs
index 4b8c57175d..61bfcd443c 100644
--- a/parquet/src/file/metadata/reader.rs
+++ b/parquet/src/file/metadata/reader.rs
@@ -15,21 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{io::Read, ops::Range};
-
 #[cfg(feature = "encryption")]
 use crate::encryption::decrypt::FileDecryptionProperties;
 use crate::errors::{ParquetError, Result};
-use crate::file::metadata::{FooterTail, ParquetMetaData};
-use crate::file::page_index::index_reader::acc_range;
+use crate::file::metadata::{FooterTail, ParquetMetaData, 
ParquetMetaDataPushDecoder};
 use crate::file::reader::ChunkReader;
 use crate::file::FOOTER_SIZE;
+use bytes::Bytes;
+use std::{io::Read, ops::Range};
 
 #[cfg(all(feature = "async", feature = "arrow"))]
 use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
-#[cfg(feature = "encryption")]
-use crate::file::metadata::parser::decode_metadata_with_encryption;
-use crate::file::metadata::parser::{decode_metadata, parse_column_index, 
parse_offset_index};
+use crate::file::metadata::parser::decode_metadata;
+use crate::DecodeResult;
 
 /// Reads [`ParquetMetaData`] from a byte stream, with either synchronous or
 /// asynchronous I/O.
@@ -40,8 +38,6 @@ use crate::file::metadata::parser::{decode_metadata, 
parse_column_index, parse_o
 ///
 ///  See the [`ParquetMetaDataPushDecoder`] for an API that does not require 
I/O.
 ///
-/// [`ParquetMetaDataPushDecoder`]: 
crate::file::metadata::push_decoder::ParquetMetaDataPushDecoder
-///
 /// # Format Notes
 ///
 /// Parquet metadata is not necessarily contiguous in a Parquet file: a 
portion is stored
@@ -76,7 +72,7 @@ pub struct ParquetMetaDataReader {
     // `self.parse_metadata` is called.
     metadata_size: Option<usize>,
     #[cfg(feature = "encryption")]
-    file_decryption_properties: Option<FileDecryptionProperties>,
+    file_decryption_properties: 
Option<std::sync::Arc<FileDecryptionProperties>>,
 }
 
 /// Describes the policy for reading page indexes
@@ -186,7 +182,7 @@ impl ParquetMetaDataReader {
         mut self,
         properties: Option<&FileDecryptionProperties>,
     ) -> Self {
-        self.file_decryption_properties = properties.cloned();
+        self.file_decryption_properties = 
properties.cloned().map(std::sync::Arc::new);
         self
     }
 
@@ -220,8 +216,6 @@ impl ParquetMetaDataReader {
     ///     .with_page_indexes(true)
     ///     .parse_and_finish(&file).unwrap();
     /// ```
-    ///
-    /// [`Bytes`]: bytes::Bytes
     pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> 
Result<ParquetMetaData> {
         self.try_parse(reader)?;
         self.finish()
@@ -232,8 +226,6 @@ impl ParquetMetaDataReader {
     /// If `reader` is [`Bytes`] based, then the buffer must contain 
sufficient bytes to complete
     /// the request, and must include the Parquet footer. If page indexes are 
desired, the buffer
     /// must contain the entire file, or [`Self::try_parse_sized()`] should be 
used.
-    ///
-    /// [`Bytes`]: bytes::Bytes
     pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
         self.try_parse_sized(reader, reader.len())
     }
@@ -310,8 +302,6 @@ impl ParquetMetaDataReader {
     /// }
     /// let metadata = reader.finish().unwrap();
     /// ```
-    ///
-    /// [`Bytes`]: bytes::Bytes
     pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: 
u64) -> Result<()> {
         self.metadata = match self.parse_metadata(reader) {
             Ok(metadata) => Some(metadata),
@@ -352,24 +342,31 @@ impl ParquetMetaDataReader {
     /// a [`Bytes`] struct containing the tail of the file).
     /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. Like
     /// [`Self::try_parse_sized()`] this function may return 
[`ParquetError::NeedMoreData`].
-    ///
-    /// [`Bytes`]: bytes::Bytes
     pub fn read_page_indexes_sized<R: ChunkReader>(
         &mut self,
         reader: &R,
         file_size: u64,
     ) -> Result<()> {
-        // Get bounds needed for page indexes (if any are present in the file).
-        let Some(range) = self.range_for_page_index() else {
-            return Ok(());
-        };
-
-        let Some(metadata) = self.metadata.as_mut() else {
+        let Some(metadata) = self.metadata.take() else {
             return Err(general_err!(
                 "Tried to read page indexes without ParquetMetaData metadata"
             ));
         };
 
+        let push_decoder = 
ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
+            .with_offset_index_policy(self.offset_index)
+            .with_column_index_policy(self.column_index);
+        let mut push_decoder = self.prepare_push_decoder(push_decoder);
+
+        // Get bounds needed for page indexes (if any are present in the file).
+        let range = match needs_index_data(&mut push_decoder)? {
+            NeedsIndexData::No(metadata) => {
+                self.metadata = Some(metadata);
+                return Ok(());
+            }
+            NeedsIndexData::Yes(range) => range,
+        };
+
         // Check to see if needed range is within `file_range`. Checking 
`range.end` seems
         // redundant, but it guards against `range_for_page_index()` returning 
garbage.
         let file_range = file_size.saturating_sub(reader.len())..file_size;
@@ -398,12 +395,13 @@ impl ParquetMetaDataReader {
             }
         }
 
+        // add the needed ranges to the decoder
         let bytes_needed = usize::try_from(range.end - range.start)?;
         let bytes = reader.get_bytes(range.start - file_range.start, 
bytes_needed)?;
-        let offset = range.start;
 
-        parse_column_index(metadata, self.column_index, &bytes, offset)?;
-        parse_offset_index(metadata, self.offset_index, &bytes, offset)?;
+        push_decoder.push_range(range, bytes)?;
+        let metadata = parse_index_data(&mut push_decoder)?;
+        self.metadata = Some(metadata);
 
         Ok(())
     }
@@ -492,15 +490,27 @@ impl ParquetMetaDataReader {
     async fn load_page_index_with_remainder<F: MetadataFetch>(
         &mut self,
         mut fetch: F,
-        remainder: Option<(usize, bytes::Bytes)>,
+        remainder: Option<(usize, Bytes)>,
     ) -> Result<()> {
-        // Get bounds needed for page indexes (if any are present in the file).
-        let Some(range) = self.range_for_page_index() else {
-            return Ok(());
+        let Some(metadata) = self.metadata.take() else {
+            return Err(general_err!("Footer metadata is not present"));
         };
 
-        let Some(metadata) = self.metadata.as_mut() else {
-            return Err(general_err!("Footer metadata is not present"));
+        // in this case we don't actually know what the file size is, so just 
use u64::MAX
+        // this is ok since the offsets in the metadata are always valid
+        let file_size = u64::MAX;
+        let push_decoder = 
ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
+            .with_offset_index_policy(self.offset_index)
+            .with_column_index_policy(self.column_index);
+        let mut push_decoder = self.prepare_push_decoder(push_decoder);
+
+        // Get bounds needed for page indexes (if any are present in the file).
+        let range = match needs_index_data(&mut push_decoder)? {
+            NeedsIndexData::No(metadata) => {
+                self.metadata = Some(metadata);
+                return Ok(());
+            }
+            NeedsIndexData::Yes(range) => range,
         };
 
         let bytes = match &remainder {
@@ -517,31 +527,12 @@ impl ParquetMetaDataReader {
 
         // Sanity check
         assert_eq!(bytes.len() as u64, range.end - range.start);
-
-        parse_column_index(metadata, self.column_index, &bytes, range.start)?;
-        parse_offset_index(metadata, self.offset_index, &bytes, range.start)?;
-
+        push_decoder.push_range(range.clone(), bytes)?;
+        let metadata = parse_index_data(&mut push_decoder)?;
+        self.metadata = Some(metadata);
         Ok(())
     }
 
-    fn range_for_page_index(&self) -> Option<Range<u64>> {
-        // sanity check
-        self.metadata.as_ref()?;
-
-        // Get bounds needed for page indexes (if any are present in the file).
-        let mut range = None;
-        let metadata = self.metadata.as_ref().unwrap();
-        for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
-            if self.column_index != PageIndexPolicy::Skip {
-                range = acc_range(range, c.column_index_range());
-            }
-            if self.offset_index != PageIndexPolicy::Skip {
-                range = acc_range(range, c.offset_index_range());
-            }
-        }
-        range
-    }
-
     // One-shot parse of footer.
     // Side effect: this will set `self.metadata_size`
     fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> 
Result<ParquetMetaData> {
@@ -556,7 +547,7 @@ impl ParquetMetaDataReader {
             .get_read(file_size - 8)?
             .read_exact(&mut footer)?;
 
-        let footer = Self::decode_footer_tail(&footer)?;
+        let footer = FooterTail::try_new(&footer)?;
         let metadata_len = footer.metadata_length();
         let footer_metadata_len = FOOTER_SIZE + metadata_len;
         self.metadata_size = Some(footer_metadata_len);
@@ -566,10 +557,8 @@ impl ParquetMetaDataReader {
         }
 
         let start = file_size - footer_metadata_len as u64;
-        self.decode_footer_metadata(
-            chunk_reader.get_bytes(start, metadata_len)?.as_ref(),
-            &footer,
-        )
+        let bytes = chunk_reader.get_bytes(start, metadata_len)?;
+        self.decode_footer_metadata(bytes, file_size, footer)
     }
 
     /// Return the number of bytes to read in the initial pass. If 
`prefetch_size` has
@@ -590,7 +579,7 @@ impl ParquetMetaDataReader {
         &self,
         fetch: &mut F,
         file_size: u64,
-    ) -> Result<(ParquetMetaData, Option<(usize, bytes::Bytes)>)> {
+    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
         let prefetch = self.get_prefetch_size() as u64;
 
         if file_size < FOOTER_SIZE as u64 {
@@ -618,7 +607,7 @@ impl ParquetMetaDataReader {
         let mut footer = [0; FOOTER_SIZE];
         footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
 
-        let footer = Self::decode_footer_tail(&footer)?;
+        let footer = FooterTail::try_new(&footer)?;
         let length = footer.metadata_length();
 
         if file_size < (length + FOOTER_SIZE) as u64 {
@@ -635,14 +624,14 @@ impl ParquetMetaDataReader {
             let meta = fetch
                 .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
                 .await?;
-            Ok((self.decode_footer_metadata(&meta, &footer)?, None))
+            Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
         } else {
             let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - 
footer_start)
                 .try_into()
                 .expect("metadata length should never be larger than u32");
-            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
+            let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
             Ok((
-                self.decode_footer_metadata(slice, &footer)?,
+                self.decode_footer_metadata(slice, file_size, footer)?,
                 Some((footer_start as usize, suffix.slice(..metadata_start))),
             ))
         }
@@ -652,7 +641,7 @@ impl ParquetMetaDataReader {
     async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
         &self,
         fetch: &mut F,
-    ) -> Result<(ParquetMetaData, Option<(usize, bytes::Bytes)>)> {
+    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
         let prefetch = self.get_prefetch_size();
 
         let suffix = fetch.fetch_suffix(prefetch as _).await?;
@@ -669,8 +658,11 @@ impl ParquetMetaDataReader {
         let mut footer = [0; FOOTER_SIZE];
         footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
 
-        let footer = Self::decode_footer_tail(&footer)?;
+        let footer = FooterTail::try_new(&footer)?;
         let length = footer.metadata_length();
+        // fake file size as we are only parsing the footer metadata here
+        // (cant be parsing page indexes without the full file size)
+        let file_size = (length + FOOTER_SIZE) as u64;
 
         // Did not fetch the entire file metadata in the initial read, need to 
make a second request
         let metadata_offset = length + FOOTER_SIZE;
@@ -685,22 +677,21 @@ impl ParquetMetaDataReader {
                 ));
             }
 
-            Ok((
-                // need to slice off the footer or decryption fails
-                self.decode_footer_metadata(&meta.slice(0..length), &footer)?,
-                None,
-            ))
+            // need to slice off the footer or decryption fails
+            let meta = meta.slice(0..length);
+            Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
         } else {
             let metadata_start = suffix_len - metadata_offset;
-            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
+            let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
             Ok((
-                self.decode_footer_metadata(slice, &footer)?,
+                self.decode_footer_metadata(slice, file_size, footer)?,
                 Some((0, suffix.slice(..metadata_start))),
             ))
         }
     }
 
     /// Decodes a [`FooterTail`] from the provided 8-byte slice.
+    #[deprecated(since = "57.0.0", note = "Use FooterTail::try_from instead")]
     pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> 
{
         FooterTail::try_new(slice)
     }
@@ -708,7 +699,7 @@ impl ParquetMetaDataReader {
     /// Decodes the Parquet footer, returning the metadata length in bytes
     #[deprecated(since = "54.3.0", note = "Use decode_footer_tail instead")]
     pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
-        Self::decode_footer_tail(slice).map(|f| f.metadata_length())
+        FooterTail::try_new(slice).map(|f| f.metadata_length())
     }
 
     /// Decodes [`ParquetMetaData`] from the provided bytes.
@@ -726,26 +717,68 @@ impl ParquetMetaDataReader {
     /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
     pub(crate) fn decode_footer_metadata(
         &self,
-        buf: &[u8],
-        footer_tail: &FooterTail,
+        buf: Bytes,
+        file_size: u64,
+        footer_tail: FooterTail,
     ) -> Result<ParquetMetaData> {
-        #[cfg(feature = "encryption")]
-        let result = decode_metadata_with_encryption(
-            buf,
-            footer_tail.is_encrypted_footer(),
-            self.file_decryption_properties.as_ref(),
-        );
-        #[cfg(not(feature = "encryption"))]
-        let result = {
-            if footer_tail.is_encrypted_footer() {
-                Err(general_err!(
-                    "Parquet file has an encrypted footer but the encryption 
feature is disabled"
-                ))
-            } else {
-                Self::decode_metadata(buf)
-            }
-        };
-        result
+        // The push decoder expects the metadata to be at the end of the file
+        // (... data ...) + (metadata) + (footer)
+        // so we need to provide the starting offset of the metadata
+        // within the file.
+        let ending_offset = file_size.checked_sub(FOOTER_SIZE as 
u64).ok_or_else(|| {
+            general_err!(
+                "file size {file_size} is smaller than footer size {}",
+                FOOTER_SIZE
+            )
+        })?;
+
+        let starting_offset = ending_offset.checked_sub(buf.len() as 
u64).ok_or_else(|| {
+            general_err!(
+                "file size {file_size} is smaller than buffer size {} + footer 
size {}",
+                buf.len(),
+                FOOTER_SIZE
+            )
+        })?;
+
+        let range = starting_offset..ending_offset;
+
+        let push_decoder =
+            ParquetMetaDataPushDecoder::try_new_with_footer_tail(file_size, 
footer_tail)?
+                // NOTE: DO NOT enable page indexes here, they are handled 
separately
+                .with_page_index_policy(PageIndexPolicy::Skip);
+
+        let mut push_decoder = self.prepare_push_decoder(push_decoder);
+        push_decoder.push_range(range, buf)?;
+        match push_decoder.try_decode()? {
+            DecodeResult::Data(metadata) => Ok(metadata),
+            DecodeResult::Finished => Err(general_err!(
+                "could not parse parquet metadata -- previously finished"
+            )),
+            DecodeResult::NeedsData(ranges) => Err(general_err!(
+                "could not parse parquet metadata, needs ranges {:?}",
+                ranges
+            )),
+        }
+    }
+
+    /// Prepares a push decoder and runs it to decode the metadata.
+    #[cfg(feature = "encryption")]
+    fn prepare_push_decoder(
+        &self,
+        push_decoder: ParquetMetaDataPushDecoder,
+    ) -> ParquetMetaDataPushDecoder {
+        push_decoder.with_file_decryption_properties(
+            self.file_decryption_properties
+                .as_ref()
+                .map(std::sync::Arc::clone),
+        )
+    }
+    #[cfg(not(feature = "encryption"))]
+    fn prepare_push_decoder(
+        &self,
+        push_decoder: ParquetMetaDataPushDecoder,
+    ) -> ParquetMetaDataPushDecoder {
+        push_decoder
     }
 
     /// Decodes [`ParquetMetaData`] from the provided bytes.
@@ -761,12 +794,49 @@ impl ParquetMetaDataReader {
     }
 }
 
+/// The bounds needed to read page indexes
+// this is an internal enum, so it is ok to allow differences in enum size
+#[allow(clippy::large_enum_variant)]
+enum NeedsIndexData {
+    /// no additional data is needed (e.g. the indexes weren't requested)
+    No(ParquetMetaData),
+    /// Additional data is needed, with the range that are required
+    Yes(Range<u64>),
+}
+
+/// Determines a single combined range of bytes needed to read the page 
indexes,
+/// or returns the metadata if no additional data is needed (e.g. if no page 
indexes are requested)
+fn needs_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> 
Result<NeedsIndexData> {
+    match push_decoder.try_decode()? {
+        DecodeResult::NeedsData(ranges) => {
+            let range = ranges
+                .into_iter()
+                .reduce(|a, b| a.start.min(b.start)..a.end.max(b.end))
+                .ok_or_else(|| general_err!("Internal error: no ranges 
provided"))?;
+            Ok(NeedsIndexData::Yes(range))
+        }
+        DecodeResult::Data(metadata) => Ok(NeedsIndexData::No(metadata)),
+        DecodeResult::Finished => Err(general_err!("Internal error: decoder 
was finished")),
+    }
+}
+
+/// Given a push decoder that has had the needed ranges pushed to it,
+/// attempt to decode indexes and return the updated metadata.
+fn parse_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> 
Result<ParquetMetaData> {
+    match push_decoder.try_decode()? {
+        DecodeResult::NeedsData(_) => Err(general_err!(
+            "Internal error: decoder still needs data after reading required 
range"
+        )),
+        DecodeResult::Data(metadata) => Ok(metadata),
+        DecodeResult::Finished => Err(general_err!("Internal error: decoder 
was finished")),
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
     use crate::file::reader::Length;
     use crate::util::test_common::file_util::get_test_file;
-    use bytes::Bytes;
     use std::ops::Range;
 
     #[test]
diff --git a/parquet/tests/arrow_reader/io/mod.rs 
b/parquet/tests/arrow_reader/io/mod.rs
index b31f295755..2895b61eaf 100644
--- a/parquet/tests/arrow_reader/io/mod.rs
+++ b/parquet/tests/arrow_reader/io/mod.rs
@@ -46,7 +46,7 @@ use parquet::arrow::arrow_reader::{
 };
 use parquet::arrow::{ArrowWriter, ProjectionMask};
 use parquet::data_type::AsBytes;
-use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, 
ParquetOffsetIndex};
+use parquet::file::metadata::{FooterTail, ParquetMetaData, ParquetOffsetIndex};
 use parquet::file::properties::WriterProperties;
 use parquet::file::FOOTER_SIZE;
 use parquet::format::PageLocation;
@@ -210,7 +210,7 @@ impl TestParquetFile {
             .unwrap();
 
         // figure out the metadata location
-        let footer = 
ParquetMetaDataReader::decode_footer_tail(footer).unwrap();
+        let footer = FooterTail::try_new(footer).unwrap();
         let metadata_len = footer.metadata_length();
         let metadata_location = footer_location.start - 
metadata_len..footer_location.start;
 


Reply via email to