This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new a3598a9939 Parquet: Support reading Parquet metadata via suffix range
requests (#7334)
a3598a9939 is described below
commit a3598a993978f9b68bd80c17d524bab4dca78f8c
Author: Kyle Barron <[email protected]>
AuthorDate: Mon Mar 31 10:11:42 2025 -0400
Parquet: Support reading Parquet metadata via suffix range requests (#7334)
* Support reading Parquet metadata via suffix requests
* Update doctest
* Add simple test without file length provided
* address comments
* Switch `file_size` optional arg to `with_file_size` method
* Move `prefetch` lookup into `load_metadat`
* Remove extra call to `max`
* fix doctest
* Add test that suffix requests are made
* more tests
---
parquet/src/arrow/async_reader/metadata.rs | 10 ++
parquet/src/arrow/async_reader/store.rs | 109 ++++++++++---
parquet/src/file/metadata/reader.rs | 210 ++++++++++++++++++++++++-
parquet/tests/arrow_reader/encryption_async.rs | 2 +-
4 files changed, 300 insertions(+), 31 deletions(-)
diff --git a/parquet/src/arrow/async_reader/metadata.rs
b/parquet/src/arrow/async_reader/metadata.rs
index 71d2e57ddd..ff183f4185 100644
--- a/parquet/src/arrow/async_reader/metadata.rs
+++ b/parquet/src/arrow/async_reader/metadata.rs
@@ -75,6 +75,16 @@ impl<T: AsyncFileReader> MetadataFetch for &mut T {
}
}
+/// A data source that can be used with [`MetadataLoader`] to load
[`ParquetMetaData`] via suffix
+/// requests, without knowing the file size
+pub trait MetadataSuffixFetch: MetadataFetch {
+ /// Return a future that fetches the last `n` bytes asynchronously
+ ///
+ /// Note the returned type is a boxed future, often created by
+ /// [FutureExt::boxed]. See the trait documentation for an example
+ fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>>;
+}
+
/// An asynchronous interface to load [`ParquetMetaData`] from an async source
pub struct MetadataLoader<F> {
/// Function that fetches byte ranges asynchronously
diff --git a/parquet/src/arrow/async_reader/store.rs
b/parquet/src/arrow/async_reader/store.rs
index a1e94efd14..9934d2b93d 100644
--- a/parquet/src/arrow/async_reader/store.rs
+++ b/parquet/src/arrow/async_reader/store.rs
@@ -18,12 +18,13 @@
use std::{ops::Range, sync::Arc};
use crate::arrow::arrow_reader::ArrowReaderOptions;
-use crate::arrow::async_reader::AsyncFileReader;
+use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
-use object_store::{path::Path, ObjectMeta, ObjectStore};
+use object_store::{path::Path, ObjectStore};
+use object_store::{GetOptions, GetRange};
use tokio::runtime::Handle;
/// Reads Parquet files in object storage using [`ObjectStore`].
@@ -45,7 +46,7 @@ use tokio::runtime::Handle;
/// println!("Found Blob with {}B at {}", meta.size, meta.location);
///
/// // Show Parquet metadata
-/// let reader = ParquetObjectReader::new(storage_container, meta);
+/// let reader = ParquetObjectReader::new(storage_container,
meta.location).with_file_size(meta.size);
/// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
/// print_parquet_metadata(&mut stdout(), builder.metadata());
/// # }
@@ -53,7 +54,8 @@ use tokio::runtime::Handle;
#[derive(Clone, Debug)]
pub struct ParquetObjectReader {
store: Arc<dyn ObjectStore>,
- meta: ObjectMeta,
+ path: Path,
+ file_size: Option<usize>,
metadata_size_hint: Option<usize>,
preload_column_index: bool,
preload_offset_index: bool,
@@ -61,13 +63,12 @@ pub struct ParquetObjectReader {
}
impl ParquetObjectReader {
- /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`]
and [`ObjectMeta`]
- ///
- /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or
[`ObjectStore::head`]
- pub fn new(store: Arc<dyn ObjectStore>, meta: ObjectMeta) -> Self {
+ /// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`]
and [`Path`].
+ pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
Self {
store,
- meta,
+ path,
+ file_size: None,
metadata_size_hint: None,
preload_column_index: false,
preload_offset_index: false,
@@ -84,6 +85,22 @@ impl ParquetObjectReader {
}
}
+ /// Provide the byte size of this file.
+ ///
+ /// If provided, the file size will ensure that only bounded range
requests are used. If file
+ /// size is not provided, the reader will use suffix range requests to
fetch the metadata.
+ ///
+ /// Providing this size up front is an important optimization to avoid
extra calls when the
+ /// underlying store does not support suffix range requests.
+ ///
+ /// The file size can be obtained using [`ObjectStore::list`] or
[`ObjectStore::head`].
+ pub fn with_file_size(self, file_size: usize) -> Self {
+ Self {
+ file_size: Some(file_size),
+ ..self
+ }
+ }
+
/// Load the Column Index as part of [`Self::get_metadata`]
pub fn with_preload_column_index(self, preload_column_index: bool) -> Self
{
Self {
@@ -125,7 +142,7 @@ impl ParquetObjectReader {
{
match &self.runtime {
Some(handle) => {
- let path = self.meta.location.clone();
+ let path = self.path.clone();
let store = Arc::clone(&self.store);
handle
.spawn(async move { f(&store, &path).await })
@@ -138,13 +155,27 @@ impl ParquetObjectReader {
)
.boxed()
}
- None => f(&self.store, &self.meta.location)
- .map_err(|e| e.into())
- .boxed(),
+ None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(),
}
}
}
+impl MetadataSuffixFetch for &mut ParquetObjectReader {
+ fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
+ let options = GetOptions {
+ range: Some(GetRange::Suffix(suffix)),
+ ..Default::default()
+ };
+ self.spawn(|store, path| {
+ async move {
+ let resp = store.get_opts(path, options).await?;
+ Ok::<_, ParquetError>(resp.bytes().await?)
+ }
+ .boxed()
+ })
+ }
+}
+
impl AsyncFileReader for ParquetObjectReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_,
Result<Bytes>> {
self.spawn(|store, path| store.get_range(path, range))
@@ -165,13 +196,16 @@ impl AsyncFileReader for ParquetObjectReader {
// `Self::get_bytes`.
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
- let file_size = self.meta.size;
- let metadata = ParquetMetaDataReader::new()
+ let metadata_reader = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
- .with_prefetch_hint(self.metadata_size_hint)
- .load_and_finish(self, file_size)
- .await?;
+ .with_prefetch_hint(self.metadata_size_hint);
+ let metadata = if let Some(file_size) = self.file_size {
+ metadata_reader.load_and_finish(self, file_size).await?
+ } else {
+ metadata_reader.load_via_suffix_and_finish(self).await?
+ };
+
Ok(Arc::new(metadata))
})
}
@@ -181,7 +215,6 @@ impl AsyncFileReader for ParquetObjectReader {
options: &'a ArrowReaderOptions,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
- let file_size = self.meta.size;
let metadata = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
@@ -191,7 +224,11 @@ impl AsyncFileReader for ParquetObjectReader {
let metadata =
metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
- let metadata = metadata.load_and_finish(self, file_size).await?;
+ let metadata = if let Some(file_size) = self.file_size {
+ metadata.load_and_finish(self, file_size).await?
+ } else {
+ metadata.load_via_suffix_and_finish(self).await?
+ };
Ok(Arc::new(metadata))
})
@@ -231,7 +268,22 @@ mod tests {
#[tokio::test]
async fn test_simple() {
let (meta, store) = get_meta_store().await;
- let object_reader = ParquetObjectReader::new(store, meta);
+ let object_reader =
+ ParquetObjectReader::new(store,
meta.location).with_file_size(meta.size);
+
+ let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
+ .await
+ .unwrap();
+ let batches: Vec<_> =
builder.build().unwrap().try_collect().await.unwrap();
+
+ assert_eq!(batches.len(), 1);
+ assert_eq!(batches[0].num_rows(), 8);
+ }
+
+ #[tokio::test]
+ async fn test_simple_without_file_length() {
+ let (meta, store) = get_meta_store().await;
+ let object_reader = ParquetObjectReader::new(store, meta.location);
let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
.await
@@ -247,7 +299,8 @@ mod tests {
let (mut meta, store) = get_meta_store().await;
meta.location = Path::from("I don't exist.parquet");
- let object_reader = ParquetObjectReader::new(store, meta);
+ let object_reader =
+ ParquetObjectReader::new(store,
meta.location).with_file_size(meta.size);
// Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug
match ParquetRecordBatchStreamBuilder::new(object_reader).await {
Ok(_) => panic!("expected failure"),
@@ -280,7 +333,9 @@ mod tests {
let initial_actions = num_actions.load(Ordering::Relaxed);
- let reader = ParquetObjectReader::new(store,
meta).with_runtime(rt.handle().clone());
+ let reader = ParquetObjectReader::new(store, meta.location)
+ .with_file_size(meta.size)
+ .with_runtime(rt.handle().clone());
let builder =
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
let batches: Vec<_> =
builder.build().unwrap().try_collect().await.unwrap();
@@ -306,7 +361,9 @@ mod tests {
let (meta, store) = get_meta_store().await;
- let reader = ParquetObjectReader::new(store,
meta).with_runtime(rt.handle().clone());
+ let reader = ParquetObjectReader::new(store, meta.location)
+ .with_file_size(meta.size)
+ .with_runtime(rt.handle().clone());
let current_id = std::thread::current().id();
@@ -329,7 +386,9 @@ mod tests {
let (meta, store) = get_meta_store().await;
- let mut reader = ParquetObjectReader::new(store,
meta).with_runtime(rt.handle().clone());
+ let mut reader = ParquetObjectReader::new(store, meta.location)
+ .with_file_size(meta.size)
+ .with_runtime(rt.handle().clone());
rt.shutdown_background();
diff --git a/parquet/src/file/metadata/reader.rs
b/parquet/src/file/metadata/reader.rs
index 8532b59667..c2423ecc1b 100644
--- a/parquet/src/file/metadata/reader.rs
+++ b/parquet/src/file/metadata/reader.rs
@@ -40,7 +40,7 @@ use crate::schema::types::SchemaDescriptor;
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
#[cfg(all(feature = "async", feature = "arrow"))]
-use crate::arrow::async_reader::MetadataFetch;
+use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
/// Reads the [`ParquetMetaData`] from a byte stream.
///
@@ -403,6 +403,20 @@ impl ParquetMetaDataReader {
self.finish()
}
+ /// Given a [`MetadataSuffixFetch`], parse and return the
[`ParquetMetaData`] in a single pass.
+ ///
+ /// This call will consume `self`.
+ ///
+ /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the
number of fetches
+ /// performed by this function.
+ #[cfg(all(feature = "async", feature = "arrow"))]
+ pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
+ mut self,
+ fetch: F,
+ ) -> Result<ParquetMetaData> {
+ self.try_load_via_suffix(fetch).await?;
+ self.finish()
+ }
/// Attempts to (asynchronously) parse the footer metadata (and optionally
page indexes)
/// given a [`MetadataFetch`].
///
@@ -414,9 +428,29 @@ impl ParquetMetaDataReader {
mut fetch: F,
file_size: usize,
) -> Result<()> {
- let (metadata, remainder) = self
- .load_metadata(&mut fetch, file_size, self.get_prefetch_size())
- .await?;
+ let (metadata, remainder) = self.load_metadata(&mut fetch,
file_size).await?;
+
+ self.metadata = Some(metadata);
+
+ // we can return if page indexes aren't requested
+ if !self.column_index && !self.offset_index {
+ return Ok(());
+ }
+
+ self.load_page_index_with_remainder(fetch, remainder).await
+ }
+
+ /// Attempts to (asynchronously) parse the footer metadata (and optionally
page indexes)
+ /// given a [`MetadataSuffixFetch`].
+ ///
+ /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the
number of fetches
+ /// performed by this function.
+ #[cfg(all(feature = "async", feature = "arrow"))]
+ pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
+ &mut self,
+ mut fetch: F,
+ ) -> Result<()> {
+ let (metadata, remainder) = self.load_metadata_via_suffix(&mut
fetch).await?;
self.metadata = Some(metadata);
@@ -587,8 +621,9 @@ impl ParquetMetaDataReader {
&self,
fetch: &mut F,
file_size: usize,
- prefetch: usize,
) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
+ let prefetch = self.get_prefetch_size();
+
if file_size < FOOTER_SIZE {
return Err(eof_err!("file size of {} is less than footer",
file_size));
}
@@ -638,6 +673,54 @@ impl ParquetMetaDataReader {
}
}
+ #[cfg(all(feature = "async", feature = "arrow"))]
+ async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
+ &self,
+ fetch: &mut F,
+ ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
+ let prefetch = self.get_prefetch_size();
+
+ let suffix = fetch.fetch_suffix(prefetch).await?;
+ let suffix_len = suffix.len();
+
+ if suffix_len < FOOTER_SIZE {
+ return Err(eof_err!(
+ "footer metadata requires {} bytes, but could only read {}",
+ FOOTER_SIZE,
+ suffix_len
+ ));
+ }
+
+ let mut footer = [0; FOOTER_SIZE];
+ footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
+
+ let footer = Self::decode_footer_tail(&footer)?;
+ let length = footer.metadata_length();
+
+ // Did not fetch the entire file metadata in the initial read, need to
make a second request
+ let metadata_offset = length + FOOTER_SIZE;
+ if length > suffix_len - FOOTER_SIZE {
+ let meta = fetch.fetch_suffix(metadata_offset).await?;
+
+ if meta.len() < metadata_offset {
+ return Err(eof_err!(
+ "metadata requires {} bytes, but could only read {}",
+ metadata_offset,
+ meta.len()
+ ));
+ }
+
+ Ok((self.decode_footer_metadata(&meta, &footer)?, None))
+ } else {
+ let metadata_start = suffix_len - metadata_offset;
+ let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
+ Ok((
+ self.decode_footer_metadata(slice, &footer)?,
+ Some((0, suffix.slice(..metadata_start))),
+ ))
+ }
+ }
+
/// Decodes the end of the Parquet footer
///
/// There are 8 bytes at the end of the Parquet footer with the following
layout:
@@ -1131,6 +1214,30 @@ mod async_tests {
}
}
+ struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
+
+ impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
+ where
+ F1: FnMut(Range<usize>) -> Fut + Send,
+ Fut: Future<Output = Result<Bytes>> + Send,
+ F2: Send,
+ {
+ fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_,
Result<Bytes>> {
+ async move { self.0(range).await }.boxed()
+ }
+ }
+
+ impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
+ where
+ F1: FnMut(Range<usize>) -> Fut + Send,
+ F2: FnMut(usize) -> Fut + Send,
+ Fut: Future<Output = Result<Bytes>> + Send,
+ {
+ fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_,
Result<Bytes>> {
+ async move { self.1(suffix).await }.boxed()
+ }
+ }
+
fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
file.seek(SeekFrom::Start(range.start as _))?;
let len = range.end - range.start;
@@ -1139,6 +1246,15 @@ mod async_tests {
Ok(buf.into())
}
+ fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
+ let file_len = file.len();
+ // Don't seek before beginning of file
+ file.seek(SeekFrom::End(0 - suffix.min(file_len as usize) as i64))?;
+ let mut buf = Vec::with_capacity(suffix);
+ file.take(suffix as _).read_to_end(&mut buf)?;
+ Ok(buf.into())
+ }
+
#[tokio::test]
async fn test_simple() {
let mut file = get_test_file("nulls.snappy.parquet");
@@ -1224,6 +1340,90 @@ mod async_tests {
assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
}
+ #[tokio::test]
+ async fn test_suffix() {
+ let mut file = get_test_file("nulls.snappy.parquet");
+ let mut file2 = file.try_clone().unwrap();
+
+ let expected = ParquetMetaDataReader::new()
+ .parse_and_finish(&file)
+ .unwrap();
+ let expected = expected.file_metadata().schema();
+ let fetch_count = AtomicUsize::new(0);
+ let suffix_fetch_count = AtomicUsize::new(0);
+
+ let mut fetch = |range| {
+ fetch_count.fetch_add(1, Ordering::SeqCst);
+ futures::future::ready(read_range(&mut file, range))
+ };
+ let mut suffix_fetch = |suffix| {
+ suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
+ futures::future::ready(read_suffix(&mut file2, suffix))
+ };
+
+ let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
+ let actual = ParquetMetaDataReader::new()
+ .load_via_suffix_and_finish(input)
+ .await
+ .unwrap();
+ assert_eq!(actual.file_metadata().schema(), expected);
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
+ assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
+
+ // Metadata hint too small - below footer size
+ fetch_count.store(0, Ordering::SeqCst);
+ suffix_fetch_count.store(0, Ordering::SeqCst);
+ let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
+ let actual = ParquetMetaDataReader::new()
+ .with_prefetch_hint(Some(7))
+ .load_via_suffix_and_finish(input)
+ .await
+ .unwrap();
+ assert_eq!(actual.file_metadata().schema(), expected);
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
+ assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
+
+ // Metadata hint too small
+ fetch_count.store(0, Ordering::SeqCst);
+ suffix_fetch_count.store(0, Ordering::SeqCst);
+ let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
+ let actual = ParquetMetaDataReader::new()
+ .with_prefetch_hint(Some(10))
+ .load_via_suffix_and_finish(input)
+ .await
+ .unwrap();
+ assert_eq!(actual.file_metadata().schema(), expected);
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
+ assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
+
+ dbg!("test");
+ // Metadata hint too large
+ fetch_count.store(0, Ordering::SeqCst);
+ suffix_fetch_count.store(0, Ordering::SeqCst);
+ let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
+ let actual = ParquetMetaDataReader::new()
+ .with_prefetch_hint(Some(500))
+ .load_via_suffix_and_finish(input)
+ .await
+ .unwrap();
+ assert_eq!(actual.file_metadata().schema(), expected);
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
+ assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
+
+ // Metadata hint exactly correct
+ fetch_count.store(0, Ordering::SeqCst);
+ suffix_fetch_count.store(0, Ordering::SeqCst);
+ let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
+ let actual = ParquetMetaDataReader::new()
+ .with_prefetch_hint(Some(428))
+ .load_via_suffix_and_finish(input)
+ .await
+ .unwrap();
+ assert_eq!(actual.file_metadata().schema(), expected);
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
+ assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
+ }
+
#[tokio::test]
async fn test_page_index() {
let mut file = get_test_file("alltypes_tiny_pages.parquet");
diff --git a/parquet/tests/arrow_reader/encryption_async.rs
b/parquet/tests/arrow_reader/encryption_async.rs
index eeac10f574..9c9b324582 100644
--- a/parquet/tests/arrow_reader/encryption_async.rs
+++ b/parquet/tests/arrow_reader/encryption_async.rs
@@ -276,7 +276,7 @@ async fn test_read_encrypted_file_from_object_store() {
.unwrap();
let options =
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
- let mut reader = ParquetObjectReader::new(store, meta);
+ let mut reader = ParquetObjectReader::new(store,
meta.location).with_file_size(meta.size);
let metadata = reader.get_metadata_with_options(&options).await.unwrap();
let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader,
options)
.await