This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new a3598a9939 Parquet: Support reading Parquet metadata via suffix range 
requests (#7334)
a3598a9939 is described below

commit a3598a993978f9b68bd80c17d524bab4dca78f8c
Author: Kyle Barron <[email protected]>
AuthorDate: Mon Mar 31 10:11:42 2025 -0400

    Parquet: Support reading Parquet metadata via suffix range requests (#7334)
    
    * Support reading Parquet metadata via suffix requests
    
    * Update doctest
    
    * Add simple test without file length provided
    
    * address comments
    
    * Switch `file_size` optional arg to `with_file_size` method
    
    * Move `prefetch` lookup into `load_metadat`
    
    * Remove extra call to `max`
    
    * fix doctest
    
    * Add test that suffix requests are made
    
    * more tests
---
 parquet/src/arrow/async_reader/metadata.rs     |  10 ++
 parquet/src/arrow/async_reader/store.rs        | 109 ++++++++++---
 parquet/src/file/metadata/reader.rs            | 210 ++++++++++++++++++++++++-
 parquet/tests/arrow_reader/encryption_async.rs |   2 +-
 4 files changed, 300 insertions(+), 31 deletions(-)

diff --git a/parquet/src/arrow/async_reader/metadata.rs 
b/parquet/src/arrow/async_reader/metadata.rs
index 71d2e57ddd..ff183f4185 100644
--- a/parquet/src/arrow/async_reader/metadata.rs
+++ b/parquet/src/arrow/async_reader/metadata.rs
@@ -75,6 +75,16 @@ impl<T: AsyncFileReader> MetadataFetch for &mut T {
     }
 }
 
+/// A data source that can be used with [`MetadataLoader`] to load 
[`ParquetMetaData`] via suffix
+/// requests, without knowing the file size
+pub trait MetadataSuffixFetch: MetadataFetch {
+    /// Return a future that fetches the last `n` bytes asynchronously
+    ///
+    /// Note the returned type is a boxed future, often created by
+    /// [FutureExt::boxed]. See the trait documentation for an example
+    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>>;
+}
+
 /// An asynchronous interface to load [`ParquetMetaData`] from an async source
 pub struct MetadataLoader<F> {
     /// Function that fetches byte ranges asynchronously
diff --git a/parquet/src/arrow/async_reader/store.rs 
b/parquet/src/arrow/async_reader/store.rs
index a1e94efd14..9934d2b93d 100644
--- a/parquet/src/arrow/async_reader/store.rs
+++ b/parquet/src/arrow/async_reader/store.rs
@@ -18,12 +18,13 @@
 use std::{ops::Range, sync::Arc};
 
 use crate::arrow::arrow_reader::ArrowReaderOptions;
-use crate::arrow::async_reader::AsyncFileReader;
+use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
 use bytes::Bytes;
 use futures::{future::BoxFuture, FutureExt, TryFutureExt};
-use object_store::{path::Path, ObjectMeta, ObjectStore};
+use object_store::{path::Path, ObjectStore};
+use object_store::{GetOptions, GetRange};
 use tokio::runtime::Handle;
 
 /// Reads Parquet files in object storage using [`ObjectStore`].
@@ -45,7 +46,7 @@ use tokio::runtime::Handle;
 /// println!("Found Blob with {}B at {}", meta.size, meta.location);
 ///
 /// // Show Parquet metadata
-/// let reader = ParquetObjectReader::new(storage_container, meta);
+/// let reader = ParquetObjectReader::new(storage_container, 
meta.location).with_file_size(meta.size);
 /// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
 /// print_parquet_metadata(&mut stdout(), builder.metadata());
 /// # }
@@ -53,7 +54,8 @@ use tokio::runtime::Handle;
 #[derive(Clone, Debug)]
 pub struct ParquetObjectReader {
     store: Arc<dyn ObjectStore>,
-    meta: ObjectMeta,
+    path: Path,
+    file_size: Option<usize>,
     metadata_size_hint: Option<usize>,
     preload_column_index: bool,
     preload_offset_index: bool,
@@ -61,13 +63,12 @@ pub struct ParquetObjectReader {
 }
 
 impl ParquetObjectReader {
-    /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] 
and [`ObjectMeta`]
-    ///
-    /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or 
[`ObjectStore::head`]
-    pub fn new(store: Arc<dyn ObjectStore>, meta: ObjectMeta) -> Self {
+    /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] 
and [`Path`].
+    pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
         Self {
             store,
-            meta,
+            path,
+            file_size: None,
             metadata_size_hint: None,
             preload_column_index: false,
             preload_offset_index: false,
@@ -84,6 +85,22 @@ impl ParquetObjectReader {
         }
     }
 
+    /// Provide the byte size of this file.
+    ///
+    /// If provided, the file size will ensure that only bounded range 
requests are used. If file
+    /// size is not provided, the reader will use suffix range requests to 
fetch the metadata.
+    ///
+    /// Providing this size up front is an important optimization to avoid 
extra calls when the
+    /// underlying store does not support suffix range requests.
+    ///
+    /// The file size can be obtained using [`ObjectStore::list`] or 
[`ObjectStore::head`].
+    pub fn with_file_size(self, file_size: usize) -> Self {
+        Self {
+            file_size: Some(file_size),
+            ..self
+        }
+    }
+
     /// Load the Column Index as part of [`Self::get_metadata`]
     pub fn with_preload_column_index(self, preload_column_index: bool) -> Self 
{
         Self {
@@ -125,7 +142,7 @@ impl ParquetObjectReader {
     {
         match &self.runtime {
             Some(handle) => {
-                let path = self.meta.location.clone();
+                let path = self.path.clone();
                 let store = Arc::clone(&self.store);
                 handle
                     .spawn(async move { f(&store, &path).await })
@@ -138,13 +155,27 @@ impl ParquetObjectReader {
                     )
                     .boxed()
             }
-            None => f(&self.store, &self.meta.location)
-                .map_err(|e| e.into())
-                .boxed(),
+            None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(),
         }
     }
 }
 
+impl MetadataSuffixFetch for &mut ParquetObjectReader {
+    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
+        let options = GetOptions {
+            range: Some(GetRange::Suffix(suffix)),
+            ..Default::default()
+        };
+        self.spawn(|store, path| {
+            async move {
+                let resp = store.get_opts(path, options).await?;
+                Ok::<_, ParquetError>(resp.bytes().await?)
+            }
+            .boxed()
+        })
+    }
+}
+
 impl AsyncFileReader for ParquetObjectReader {
     fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, 
Result<Bytes>> {
         self.spawn(|store, path| store.get_range(path, range))
@@ -165,13 +196,16 @@ impl AsyncFileReader for ParquetObjectReader {
     // `Self::get_bytes`.
     fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
         Box::pin(async move {
-            let file_size = self.meta.size;
-            let metadata = ParquetMetaDataReader::new()
+            let metadata_reader = ParquetMetaDataReader::new()
                 .with_column_indexes(self.preload_column_index)
                 .with_offset_indexes(self.preload_offset_index)
-                .with_prefetch_hint(self.metadata_size_hint)
-                .load_and_finish(self, file_size)
-                .await?;
+                .with_prefetch_hint(self.metadata_size_hint);
+            let metadata = if let Some(file_size) = self.file_size {
+                metadata_reader.load_and_finish(self, file_size).await?
+            } else {
+                metadata_reader.load_via_suffix_and_finish(self).await?
+            };
+
             Ok(Arc::new(metadata))
         })
     }
@@ -181,7 +215,6 @@ impl AsyncFileReader for ParquetObjectReader {
         options: &'a ArrowReaderOptions,
     ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
         Box::pin(async move {
-            let file_size = self.meta.size;
             let metadata = ParquetMetaDataReader::new()
                 .with_column_indexes(self.preload_column_index)
                 .with_offset_indexes(self.preload_offset_index)
@@ -191,7 +224,11 @@ impl AsyncFileReader for ParquetObjectReader {
             let metadata =
                 
metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
 
-            let metadata = metadata.load_and_finish(self, file_size).await?;
+            let metadata = if let Some(file_size) = self.file_size {
+                metadata.load_and_finish(self, file_size).await?
+            } else {
+                metadata.load_via_suffix_and_finish(self).await?
+            };
 
             Ok(Arc::new(metadata))
         })
@@ -231,7 +268,22 @@ mod tests {
     #[tokio::test]
     async fn test_simple() {
         let (meta, store) = get_meta_store().await;
-        let object_reader = ParquetObjectReader::new(store, meta);
+        let object_reader =
+            ParquetObjectReader::new(store, 
meta.location).with_file_size(meta.size);
+
+        let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
+            .await
+            .unwrap();
+        let batches: Vec<_> = 
builder.build().unwrap().try_collect().await.unwrap();
+
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches[0].num_rows(), 8);
+    }
+
+    #[tokio::test]
+    async fn test_simple_without_file_length() {
+        let (meta, store) = get_meta_store().await;
+        let object_reader = ParquetObjectReader::new(store, meta.location);
 
         let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
             .await
@@ -247,7 +299,8 @@ mod tests {
         let (mut meta, store) = get_meta_store().await;
         meta.location = Path::from("I don't exist.parquet");
 
-        let object_reader = ParquetObjectReader::new(store, meta);
+        let object_reader =
+            ParquetObjectReader::new(store, 
meta.location).with_file_size(meta.size);
         // Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug
         match ParquetRecordBatchStreamBuilder::new(object_reader).await {
             Ok(_) => panic!("expected failure"),
@@ -280,7 +333,9 @@ mod tests {
 
         let initial_actions = num_actions.load(Ordering::Relaxed);
 
-        let reader = ParquetObjectReader::new(store, 
meta).with_runtime(rt.handle().clone());
+        let reader = ParquetObjectReader::new(store, meta.location)
+            .with_file_size(meta.size)
+            .with_runtime(rt.handle().clone());
 
         let builder = 
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
         let batches: Vec<_> = 
builder.build().unwrap().try_collect().await.unwrap();
@@ -306,7 +361,9 @@ mod tests {
 
         let (meta, store) = get_meta_store().await;
 
-        let reader = ParquetObjectReader::new(store, 
meta).with_runtime(rt.handle().clone());
+        let reader = ParquetObjectReader::new(store, meta.location)
+            .with_file_size(meta.size)
+            .with_runtime(rt.handle().clone());
 
         let current_id = std::thread::current().id();
 
@@ -329,7 +386,9 @@ mod tests {
 
         let (meta, store) = get_meta_store().await;
 
-        let mut reader = ParquetObjectReader::new(store, 
meta).with_runtime(rt.handle().clone());
+        let mut reader = ParquetObjectReader::new(store, meta.location)
+            .with_file_size(meta.size)
+            .with_runtime(rt.handle().clone());
 
         rt.shutdown_background();
 
diff --git a/parquet/src/file/metadata/reader.rs 
b/parquet/src/file/metadata/reader.rs
index 8532b59667..c2423ecc1b 100644
--- a/parquet/src/file/metadata/reader.rs
+++ b/parquet/src/file/metadata/reader.rs
@@ -40,7 +40,7 @@ use crate::schema::types::SchemaDescriptor;
 use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
 
 #[cfg(all(feature = "async", feature = "arrow"))]
-use crate::arrow::async_reader::MetadataFetch;
+use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
 
 /// Reads the [`ParquetMetaData`] from a byte stream.
 ///
@@ -403,6 +403,20 @@ impl ParquetMetaDataReader {
         self.finish()
     }
 
+    /// Given a [`MetadataSuffixFetch`], parse and return the 
[`ParquetMetaData`] in a single pass.
+    ///
+    /// This call will consume `self`.
+    ///
+    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the 
number of fetches
+    /// performed by this function.
+    #[cfg(all(feature = "async", feature = "arrow"))]
+    pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
+        mut self,
+        fetch: F,
+    ) -> Result<ParquetMetaData> {
+        self.try_load_via_suffix(fetch).await?;
+        self.finish()
+    }
     /// Attempts to (asynchronously) parse the footer metadata (and optionally 
page indexes)
     /// given a [`MetadataFetch`].
     ///
@@ -414,9 +428,29 @@ impl ParquetMetaDataReader {
         mut fetch: F,
         file_size: usize,
     ) -> Result<()> {
-        let (metadata, remainder) = self
-            .load_metadata(&mut fetch, file_size, self.get_prefetch_size())
-            .await?;
+        let (metadata, remainder) = self.load_metadata(&mut fetch, 
file_size).await?;
+
+        self.metadata = Some(metadata);
+
+        // we can return if page indexes aren't requested
+        if !self.column_index && !self.offset_index {
+            return Ok(());
+        }
+
+        self.load_page_index_with_remainder(fetch, remainder).await
+    }
+
+    /// Attempts to (asynchronously) parse the footer metadata (and optionally 
page indexes)
+    /// given a [`MetadataSuffixFetch`].
+    ///
+    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the 
number of fetches
+    /// performed by this function.
+    #[cfg(all(feature = "async", feature = "arrow"))]
+    pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
+        &mut self,
+        mut fetch: F,
+    ) -> Result<()> {
+        let (metadata, remainder) = self.load_metadata_via_suffix(&mut 
fetch).await?;
 
         self.metadata = Some(metadata);
 
@@ -587,8 +621,9 @@ impl ParquetMetaDataReader {
         &self,
         fetch: &mut F,
         file_size: usize,
-        prefetch: usize,
     ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
+        let prefetch = self.get_prefetch_size();
+
         if file_size < FOOTER_SIZE {
             return Err(eof_err!("file size of {} is less than footer", 
file_size));
         }
@@ -638,6 +673,54 @@ impl ParquetMetaDataReader {
         }
     }
 
+    #[cfg(all(feature = "async", feature = "arrow"))]
+    async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
+        &self,
+        fetch: &mut F,
+    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
+        let prefetch = self.get_prefetch_size();
+
+        let suffix = fetch.fetch_suffix(prefetch).await?;
+        let suffix_len = suffix.len();
+
+        if suffix_len < FOOTER_SIZE {
+            return Err(eof_err!(
+                "footer metadata requires {} bytes, but could only read {}",
+                FOOTER_SIZE,
+                suffix_len
+            ));
+        }
+
+        let mut footer = [0; FOOTER_SIZE];
+        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
+
+        let footer = Self::decode_footer_tail(&footer)?;
+        let length = footer.metadata_length();
+
+        // Did not fetch the entire file metadata in the initial read, need to 
make a second request
+        let metadata_offset = length + FOOTER_SIZE;
+        if length > suffix_len - FOOTER_SIZE {
+            let meta = fetch.fetch_suffix(metadata_offset).await?;
+
+            if meta.len() < metadata_offset {
+                return Err(eof_err!(
+                    "metadata requires {} bytes, but could only read {}",
+                    metadata_offset,
+                    meta.len()
+                ));
+            }
+
+            Ok((self.decode_footer_metadata(&meta, &footer)?, None))
+        } else {
+            let metadata_start = suffix_len - metadata_offset;
+            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
+            Ok((
+                self.decode_footer_metadata(slice, &footer)?,
+                Some((0, suffix.slice(..metadata_start))),
+            ))
+        }
+    }
+
     /// Decodes the end of the Parquet footer
     ///
     /// There are 8 bytes at the end of the Parquet footer with the following 
layout:
@@ -1131,6 +1214,30 @@ mod async_tests {
         }
     }
 
+    struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
+
+    impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
+    where
+        F1: FnMut(Range<usize>) -> Fut + Send,
+        Fut: Future<Output = Result<Bytes>> + Send,
+        F2: Send,
+    {
+        fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, 
Result<Bytes>> {
+            async move { self.0(range).await }.boxed()
+        }
+    }
+
+    impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
+    where
+        F1: FnMut(Range<usize>) -> Fut + Send,
+        F2: FnMut(usize) -> Fut + Send,
+        Fut: Future<Output = Result<Bytes>> + Send,
+    {
+        fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, 
Result<Bytes>> {
+            async move { self.1(suffix).await }.boxed()
+        }
+    }
+
     fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
         file.seek(SeekFrom::Start(range.start as _))?;
         let len = range.end - range.start;
@@ -1139,6 +1246,15 @@ mod async_tests {
         Ok(buf.into())
     }
 
+    fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
+        let file_len = file.len();
+        // Don't seek before beginning of file
+        file.seek(SeekFrom::End(0 - suffix.min(file_len as usize) as i64))?;
+        let mut buf = Vec::with_capacity(suffix);
+        file.take(suffix as _).read_to_end(&mut buf)?;
+        Ok(buf.into())
+    }
+
     #[tokio::test]
     async fn test_simple() {
         let mut file = get_test_file("nulls.snappy.parquet");
@@ -1224,6 +1340,90 @@ mod async_tests {
         assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
     }
 
+    #[tokio::test]
+    async fn test_suffix() {
+        let mut file = get_test_file("nulls.snappy.parquet");
+        let mut file2 = file.try_clone().unwrap();
+
+        let expected = ParquetMetaDataReader::new()
+            .parse_and_finish(&file)
+            .unwrap();
+        let expected = expected.file_metadata().schema();
+        let fetch_count = AtomicUsize::new(0);
+        let suffix_fetch_count = AtomicUsize::new(0);
+
+        let mut fetch = |range| {
+            fetch_count.fetch_add(1, Ordering::SeqCst);
+            futures::future::ready(read_range(&mut file, range))
+        };
+        let mut suffix_fetch = |suffix| {
+            suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
+            futures::future::ready(read_suffix(&mut file2, suffix))
+        };
+
+        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
+        let actual = ParquetMetaDataReader::new()
+            .load_via_suffix_and_finish(input)
+            .await
+            .unwrap();
+        assert_eq!(actual.file_metadata().schema(), expected);
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
+        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
+
+        // Metadata hint too small - below footer size
+        fetch_count.store(0, Ordering::SeqCst);
+        suffix_fetch_count.store(0, Ordering::SeqCst);
+        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
+        let actual = ParquetMetaDataReader::new()
+            .with_prefetch_hint(Some(7))
+            .load_via_suffix_and_finish(input)
+            .await
+            .unwrap();
+        assert_eq!(actual.file_metadata().schema(), expected);
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
+        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
+
+        // Metadata hint too small
+        fetch_count.store(0, Ordering::SeqCst);
+        suffix_fetch_count.store(0, Ordering::SeqCst);
+        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
+        let actual = ParquetMetaDataReader::new()
+            .with_prefetch_hint(Some(10))
+            .load_via_suffix_and_finish(input)
+            .await
+            .unwrap();
+        assert_eq!(actual.file_metadata().schema(), expected);
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
+        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
+
+        dbg!("test");
+        // Metadata hint too large
+        fetch_count.store(0, Ordering::SeqCst);
+        suffix_fetch_count.store(0, Ordering::SeqCst);
+        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
+        let actual = ParquetMetaDataReader::new()
+            .with_prefetch_hint(Some(500))
+            .load_via_suffix_and_finish(input)
+            .await
+            .unwrap();
+        assert_eq!(actual.file_metadata().schema(), expected);
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
+        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
+
+        // Metadata hint exactly correct
+        fetch_count.store(0, Ordering::SeqCst);
+        suffix_fetch_count.store(0, Ordering::SeqCst);
+        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
+        let actual = ParquetMetaDataReader::new()
+            .with_prefetch_hint(Some(428))
+            .load_via_suffix_and_finish(input)
+            .await
+            .unwrap();
+        assert_eq!(actual.file_metadata().schema(), expected);
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
+        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
+    }
+
     #[tokio::test]
     async fn test_page_index() {
         let mut file = get_test_file("alltypes_tiny_pages.parquet");
diff --git a/parquet/tests/arrow_reader/encryption_async.rs 
b/parquet/tests/arrow_reader/encryption_async.rs
index eeac10f574..9c9b324582 100644
--- a/parquet/tests/arrow_reader/encryption_async.rs
+++ b/parquet/tests/arrow_reader/encryption_async.rs
@@ -276,7 +276,7 @@ async fn test_read_encrypted_file_from_object_store() {
         .unwrap();
     let options = 
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
 
-    let mut reader = ParquetObjectReader::new(store, meta);
+    let mut reader = ParquetObjectReader::new(store, 
meta.location).with_file_size(meta.size);
     let metadata = reader.get_metadata_with_options(&options).await.unwrap();
     let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, 
options)
         .await

Reply via email to