This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 3a45ae9e9e Remove `AsyncFileReader::get_metadata_with_options`, add
`options` to `AsyncFileReader::get_metadata` (#7342)
3a45ae9e9e is described below
commit 3a45ae9e9e9c6791c8c0b4af903d87cedca259fa
Author: Corwin Joy <[email protected]>
AuthorDate: Tue Apr 1 04:48:30 2025 -0700
Remove `AsyncFileReader::get_metadata_with_options`, add `options` to
`AsyncFileReader::get_metadata` (#7342)
* Remove default implementation for get_metadata_with_options and explain
why.
* Improve comment about missing implementation.
* Update get_metadata method to take an optional ArrowReaderOptions.
* Fix handling of unencrypted footer.
* Checking git tests, it seems I missed a couple dead calls. Fixed.
* Fix path with no encryption enabled.
* Make have_decryptor a bit more bullet-proof.
* Update expected error message in
test_decrypting_without_decryption_properties_fails.
* Tidy up get_metadata
* Fix merge conflicts.
---------
Co-authored-by: Adam Reeve <[email protected]>
---
parquet/examples/read_with_rowgroup.rs | 2 +-
parquet/src/arrow/async_reader/mod.rs | 75 ++++++--------------------
parquet/src/arrow/async_reader/store.rs | 28 +++-------
parquet/src/file/metadata/reader.rs | 2 +-
parquet/tests/arrow_reader/encryption.rs | 2 +-
parquet/tests/arrow_reader/encryption_async.rs | 4 +-
6 files changed, 28 insertions(+), 85 deletions(-)
diff --git a/parquet/examples/read_with_rowgroup.rs
b/parquet/examples/read_with_rowgroup.rs
index 8cccc7fe14..52b3d11227 100644
--- a/parquet/examples/read_with_rowgroup.rs
+++ b/parquet/examples/read_with_rowgroup.rs
@@ -35,7 +35,7 @@ async fn main() -> Result<()> {
let mut file = File::open(&path).await.unwrap();
// The metadata could be cached in other places, this example only shows
how to read
- let metadata = file.get_metadata().await?;
+ let metadata = file.get_metadata(None).await?;
for rg in metadata.row_groups() {
let mut rowgroup = InMemoryRowGroup::create(rg.clone(),
ProjectionMask::all());
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index b0d5b0a710..5d5a7036ee 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -101,20 +101,11 @@ pub trait AsyncFileReader: Send {
/// Provides asynchronous access to the [`ParquetMetaData`] of a parquet
file,
/// allowing fine-grained control over how metadata is sourced, in
particular allowing
/// for caching, pre-fetching, catalog metadata, etc...
- fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
-
- /// Provides asynchronous access to the [`ParquetMetaData`] of a parquet
file,
- /// allowing fine-grained control over how metadata is sourced, in
particular allowing
- /// for caching, pre-fetching, catalog metadata, decrypting, etc...
- ///
- /// By default calls `get_metadata()`
- fn get_metadata_with_options<'a>(
+ /// ArrowReaderOptions may be provided to supply decryption parameters
+ fn get_metadata<'a>(
&'a mut self,
- options: &'a ArrowReaderOptions,
- ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
- let _ = options;
- self.get_metadata()
- }
+ options: Option<&'a ArrowReaderOptions>,
+ ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
}
/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
@@ -127,15 +118,11 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
self.as_mut().get_byte_ranges(ranges)
}
- fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
- self.as_mut().get_metadata()
- }
-
- fn get_metadata_with_options<'a>(
+ fn get_metadata<'a>(
&'a mut self,
- options: &'a ArrowReaderOptions,
+ options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
- self.as_mut().get_metadata_with_options(options)
+ self.as_mut().get_metadata(options)
}
}
@@ -156,9 +143,9 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send>
AsyncFileReader for T {
.boxed()
}
- fn get_metadata_with_options<'a>(
+ fn get_metadata<'a>(
&'a mut self,
- options: &'a ArrowReaderOptions,
+ options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
async move {
@@ -169,6 +156,7 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send>
AsyncFileReader for T {
let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?;
let metadata_len = footer.metadata_length();
+
self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
.await?;
@@ -178,8 +166,9 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send>
AsyncFileReader for T {
let metadata_reader = ParquetMetaDataReader::new();
#[cfg(feature = "encryption")]
- let metadata_reader = metadata_reader
-
.with_decryption_properties(options.file_decryption_properties.as_ref());
+ let metadata_reader = metadata_reader.with_decryption_properties(
+ options.and_then(|o| o.file_decryption_properties.as_ref()),
+ );
let parquet_metadata =
metadata_reader.decode_footer_metadata(&buf, &footer)?;
@@ -187,34 +176,6 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send>
AsyncFileReader for T {
}
.boxed()
}
-
- fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
- const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
- async move {
- self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
-
- let mut buf = [0_u8; FOOTER_SIZE];
- self.read_exact(&mut buf).await?;
-
- let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?;
- let metadata_len = footer.metadata_length();
-
- if footer.is_encrypted_footer() {
- return Err(general_err!(
- "Parquet file has an encrypted footer but decryption
properties were not provided"
- ));
- }
-
- self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
- .await?;
-
- let mut buf = Vec::with_capacity(metadata_len);
- self.take(metadata_len as _).read_to_end(&mut buf).await?;
-
- Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&buf)?))
- }
- .boxed()
- }
}
impl ArrowReaderMetadata {
@@ -233,7 +194,7 @@ impl ArrowReaderMetadata {
) -> Result<Self> {
// TODO: this is all rather awkward. It would be nice if
AsyncFileReader::get_metadata
// took an argument to fetch the page indexes.
- let mut metadata = input.get_metadata_with_options(&options).await?;
+ let mut metadata = input.get_metadata(Some(&options)).await?;
if options.page_index
&& metadata.column_index().is_none()
@@ -1169,13 +1130,9 @@ mod tests {
futures::future::ready(Ok(self.data.slice(range))).boxed()
}
- fn get_metadata(&mut self) -> BoxFuture<'_,
Result<Arc<ParquetMetaData>>> {
- futures::future::ready(Ok(self.metadata.clone())).boxed()
- }
-
- fn get_metadata_with_options<'a>(
+ fn get_metadata<'a>(
&'a mut self,
- _options: &'a ArrowReaderOptions,
+ _options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
futures::future::ready(Ok(self.metadata.clone())).boxed()
}
diff --git a/parquet/src/arrow/async_reader/store.rs
b/parquet/src/arrow/async_reader/store.rs
index 9934d2b93d..dff7d9362a 100644
--- a/parquet/src/arrow/async_reader/store.rs
+++ b/parquet/src/arrow/async_reader/store.rs
@@ -194,35 +194,21 @@ impl AsyncFileReader for ParquetObjectReader {
// an `impl MetadataFetch` and calls those methods to get data from it.
Due to `Self`'s impl of
// `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just
delegated to
// `Self::get_bytes`.
- fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
- Box::pin(async move {
- let metadata_reader = ParquetMetaDataReader::new()
- .with_column_indexes(self.preload_column_index)
- .with_offset_indexes(self.preload_offset_index)
- .with_prefetch_hint(self.metadata_size_hint);
- let metadata = if let Some(file_size) = self.file_size {
- metadata_reader.load_and_finish(self, file_size).await?
- } else {
- metadata_reader.load_via_suffix_and_finish(self).await?
- };
-
- Ok(Arc::new(metadata))
- })
- }
-
- fn get_metadata_with_options<'a>(
+ fn get_metadata<'a>(
&'a mut self,
- options: &'a ArrowReaderOptions,
+ options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
- let metadata = ParquetMetaDataReader::new()
+ let mut metadata = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
.with_prefetch_hint(self.metadata_size_hint);
#[cfg(feature = "encryption")]
- let metadata =
-
metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
+ if let Some(options) = options {
+ metadata = metadata
+
.with_decryption_properties(options.file_decryption_properties.as_ref());
+ }
let metadata = if let Some(file_size) = self.file_size {
metadata.load_and_finish(self, file_size).await?
diff --git a/parquet/src/file/metadata/reader.rs
b/parquet/src/file/metadata/reader.rs
index c2423ecc1b..f0eedd2bb9 100644
--- a/parquet/src/file/metadata/reader.rs
+++ b/parquet/src/file/metadata/reader.rs
@@ -834,7 +834,7 @@ impl ParquetMetaDataReader {
file_decryptor = Some(decryptor);
} else {
- return Err(general_err!("Parquet file has an encrypted footer
but no decryption properties were provided"));
+ return Err(general_err!("Parquet file has an encrypted footer
but decryption properties were not provided"));
}
}
diff --git a/parquet/tests/arrow_reader/encryption.rs
b/parquet/tests/arrow_reader/encryption.rs
index 521212488a..362a58772a 100644
--- a/parquet/tests/arrow_reader/encryption.rs
+++ b/parquet/tests/arrow_reader/encryption.rs
@@ -154,7 +154,7 @@ fn test_decrypting_without_decryption_properties_fails() {
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
- "Parquet error: Parquet file has an encrypted footer but no decryption
properties were provided"
+ "Parquet error: Parquet file has an encrypted footer but decryption
properties were not provided"
);
}
diff --git a/parquet/tests/arrow_reader/encryption_async.rs
b/parquet/tests/arrow_reader/encryption_async.rs
index 9c9b324582..60eb97363d 100644
--- a/parquet/tests/arrow_reader/encryption_async.rs
+++ b/parquet/tests/arrow_reader/encryption_async.rs
@@ -239,7 +239,7 @@ async fn
test_decrypting_without_decryption_properties_fails() {
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
- "Parquet error: Parquet file has an encrypted footer but no decryption
properties were provided"
+ "Parquet error: Parquet file has an encrypted footer but decryption
properties were not provided"
);
}
@@ -277,7 +277,7 @@ async fn test_read_encrypted_file_from_object_store() {
let options =
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
let mut reader = ParquetObjectReader::new(store,
meta.location).with_file_size(meta.size);
- let metadata = reader.get_metadata_with_options(&options).await.unwrap();
+ let metadata = reader.get_metadata(Some(&options)).await.unwrap();
let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader,
options)
.await
.unwrap();