This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 3293a8c2f9 Deprecate `MetadataLoader` (#6474)
3293a8c2f9 is described below
commit 3293a8c2f9062fca93bee2210d540a1d25155bf5
Author: Ed Seidl <[email protected]>
AuthorDate: Mon Sep 30 11:11:22 2024 -0700
Deprecate `MetadataLoader` (#6474)
* deprecate MetadataLoader
* change signature of the load functions
* fix up fetch_parquet_metadata
* can now use self.meta.size directly
* revert changes to load API
* revert change to test code
---
parquet/src/arrow/async_reader/metadata.rs | 12 ++++++--
parquet/src/arrow/async_reader/mod.rs | 8 +++--
parquet/src/arrow/async_reader/store.rs | 17 +++++------
parquet/src/file/metadata/reader.rs | 48 ++++++++++++++----------------
parquet/src/file/metadata/writer.rs | 12 ++++----
parquet/tests/arrow_reader/bad_data.rs | 17 ++++++-----
6 files changed, 60 insertions(+), 54 deletions(-)
diff --git a/parquet/src/arrow/async_reader/metadata.rs
b/parquet/src/arrow/async_reader/metadata.rs
index cd45d2abdb..b7fac6fe7c 100644
--- a/parquet/src/arrow/async_reader/metadata.rs
+++ b/parquet/src/arrow/async_reader/metadata.rs
@@ -52,6 +52,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
/// Create a new [`MetadataLoader`] by reading the footer information
///
/// See [`fetch_parquet_metadata`] for the meaning of the individual
parameters
+ #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>)
-> Result<Self> {
if file_size < FOOTER_SIZE {
return Err(ParquetError::EOF(format!(
@@ -108,6 +109,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
}
/// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
+ #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
Self {
fetch,
@@ -120,6 +122,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
///
/// * `column_index`: if true will load column index
/// * `offset_index`: if true will load offset index
+ #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn load_page_index(&mut self, column_index: bool, offset_index:
bool) -> Result<()> {
if !column_index && !offset_index {
return Ok(());
@@ -226,6 +229,7 @@ where
/// in the first request, instead of 8, and only issue further requests
/// if additional bytes are needed. Providing a `prefetch` hint can therefore
/// significantly reduce the number of `fetch` requests, and consequently
latency
+#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn fetch_parquet_metadata<F, Fut>(
fetch: F,
file_size: usize,
@@ -236,10 +240,14 @@ where
Fut: Future<Output = Result<Bytes>> + Send,
{
let fetch = MetadataFetchFn(fetch);
- let loader = MetadataLoader::load(fetch, file_size, prefetch).await?;
- Ok(loader.finish())
+ ParquetMetaDataReader::new()
+ .with_prefetch_hint(prefetch)
+ .load_and_finish(fetch, file_size)
+ .await
}
+// these tests are all replicated in parquet::file::metadata::reader
+#[allow(deprecated)]
#[cfg(test)]
mod tests {
use super::*;
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 89e4d6adb5..5e8bdbc02e 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -212,6 +212,8 @@ impl ArrowReaderMetadata {
input: &mut T,
options: ArrowReaderOptions,
) -> Result<Self> {
+ // TODO: this is all rather awkward. It would be nice if
AsyncFileReader::get_metadata
+ // took an argument to fetch the page indexes.
let mut metadata = input.get_metadata().await?;
if options.page_index
@@ -219,9 +221,9 @@ impl ArrowReaderMetadata {
&& metadata.offset_index().is_none()
{
let m = Arc::try_unwrap(metadata).unwrap_or_else(|e|
e.as_ref().clone());
- let mut loader = MetadataLoader::new(input, m);
- loader.load_page_index(true, true).await?;
- metadata = Arc::new(loader.finish())
+ let mut reader =
ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
+ reader.load_page_index(input).await?;
+ metadata = Arc::new(reader.finish()?)
}
Self::try_new(metadata, options)
}
diff --git a/parquet/src/arrow/async_reader/store.rs
b/parquet/src/arrow/async_reader/store.rs
index 77c00e91a3..e6b47856eb 100644
--- a/parquet/src/arrow/async_reader/store.rs
+++ b/parquet/src/arrow/async_reader/store.rs
@@ -24,9 +24,9 @@ use futures::{FutureExt, TryFutureExt};
use object_store::{ObjectMeta, ObjectStore};
-use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader};
+use crate::arrow::async_reader::AsyncFileReader;
use crate::errors::Result;
-use crate::file::metadata::ParquetMetaData;
+use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
/// Reads Parquet files in object storage using [`ObjectStore`].
///
@@ -124,15 +124,14 @@ impl AsyncFileReader for ParquetObjectReader {
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
- let preload_column_index = self.preload_column_index;
- let preload_offset_index = self.preload_offset_index;
let file_size = self.meta.size;
- let prefetch = self.metadata_size_hint;
- let mut loader = MetadataLoader::load(self, file_size,
prefetch).await?;
- loader
- .load_page_index(preload_column_index, preload_offset_index)
+ let metadata = ParquetMetaDataReader::new()
+ .with_column_indexes(self.preload_column_index)
+ .with_offset_indexes(self.preload_offset_index)
+ .with_prefetch_hint(self.metadata_size_hint)
+ .load_and_finish(self, file_size)
.await?;
- Ok(Arc::new(loader.finish()))
+ Ok(Arc::new(metadata))
})
}
}
diff --git a/parquet/src/file/metadata/reader.rs
b/parquet/src/file/metadata/reader.rs
index 9e00c68604..3fd2bd76f6 100644
--- a/parquet/src/file/metadata/reader.rs
+++ b/parquet/src/file/metadata/reader.rs
@@ -329,13 +329,18 @@ impl ParquetMetaDataReader {
return Ok(());
}
- self.load_page_index(fetch, remainder).await
+ self.load_page_index_with_remainder(fetch, remainder).await
}
/// Asynchronously fetch the page index structures when a
[`ParquetMetaData`] has already
/// been obtained. See [`Self::new_with_metadata()`].
#[cfg(feature = "async")]
- pub async fn load_page_index<F: MetadataFetch>(
+ pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) ->
Result<()> {
+ self.load_page_index_with_remainder(fetch, None).await
+ }
+
+ #[cfg(feature = "async")]
+ async fn load_page_index_with_remainder<F: MetadataFetch>(
&mut self,
mut fetch: F,
remainder: Option<(usize, Bytes)>,
@@ -836,7 +841,7 @@ mod async_tests {
struct MetadataFetchFn<F>(F);
- impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
+ impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn<F>
where
F: FnMut(Range<usize>) -> Fut + Send,
Fut: Future<Output = Result<Bytes>> + Send,
@@ -865,14 +870,14 @@ mod async_tests {
let expected = expected.file_metadata().schema();
let fetch_count = AtomicUsize::new(0);
- let mut fetch = |range| {
+ let fetch = |range| {
fetch_count.fetch_add(1, Ordering::SeqCst);
futures::future::ready(read_range(&mut file, range))
};
- let input = MetadataFetchFn(&mut fetch);
+ let mut f = MetadataFetchFn(fetch);
let actual = ParquetMetaDataReader::new()
- .load_and_finish(input, len)
+ .load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
@@ -880,10 +885,9 @@ mod async_tests {
// Metadata hint too small - below footer size
fetch_count.store(0, Ordering::SeqCst);
- let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(7))
- .load_and_finish(input, len)
+ .load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
@@ -891,10 +895,9 @@ mod async_tests {
// Metadata hint too small
fetch_count.store(0, Ordering::SeqCst);
- let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(10))
- .load_and_finish(input, len)
+ .load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
@@ -902,10 +905,9 @@ mod async_tests {
// Metadata hint too large
fetch_count.store(0, Ordering::SeqCst);
- let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(500))
- .load_and_finish(input, len)
+ .load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
@@ -913,26 +915,23 @@ mod async_tests {
// Metadata hint exactly correct
fetch_count.store(0, Ordering::SeqCst);
- let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(428))
- .load_and_finish(input, len)
+ .load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
- let input = MetadataFetchFn(&mut fetch);
let err = ParquetMetaDataReader::new()
- .load_and_finish(input, 4)
+ .load_and_finish(&mut f, 4)
.await
.unwrap_err()
.to_string();
assert_eq!(err, "EOF: file size of 4 is less than footer");
- let input = MetadataFetchFn(&mut fetch);
let err = ParquetMetaDataReader::new()
- .load_and_finish(input, 20)
+ .load_and_finish(&mut f, 20)
.await
.unwrap_err()
.to_string();
@@ -949,42 +948,39 @@ mod async_tests {
futures::future::ready(read_range(&mut file, range))
};
- let f = MetadataFetchFn(&mut fetch);
+ let mut f = MetadataFetchFn(&mut fetch);
let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
- loader.try_load(f, len).await.unwrap();
+ loader.try_load(&mut f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() &&
metadata.column_index().is_some());
// Prefetch just footer exactly
fetch_count.store(0, Ordering::SeqCst);
- let f = MetadataFetchFn(&mut fetch);
let mut loader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(1729));
- loader.try_load(f, len).await.unwrap();
+ loader.try_load(&mut f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() &&
metadata.column_index().is_some());
// Prefetch more than footer but not enough
fetch_count.store(0, Ordering::SeqCst);
- let f = MetadataFetchFn(&mut fetch);
let mut loader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(130649));
- loader.try_load(f, len).await.unwrap();
+ loader.try_load(&mut f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() &&
metadata.column_index().is_some());
// Prefetch exactly enough
fetch_count.store(0, Ordering::SeqCst);
- let f = MetadataFetchFn(&mut fetch);
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(130650))
- .load_and_finish(f, len)
+ .load_and_finish(&mut f, len)
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
diff --git a/parquet/src/file/metadata/writer.rs
b/parquet/src/file/metadata/writer.rs
index db78606e42..44328c635f 100644
--- a/parquet/src/file/metadata/writer.rs
+++ b/parquet/src/file/metadata/writer.rs
@@ -516,7 +516,7 @@ mod tests {
/// Temporary function so we can test loading metadata with page indexes
/// while we haven't fully figured out how to load it cleanly
async fn load_metadata_from_bytes(file_size: usize, data: Bytes) ->
ParquetMetaData {
- use crate::arrow::async_reader::{MetadataFetch, MetadataLoader};
+ use crate::arrow::async_reader::MetadataFetch;
use crate::errors::Result as ParquetResult;
use futures::future::BoxFuture;
use futures::FutureExt;
@@ -569,13 +569,11 @@ mod tests {
Box::new(AsyncBytes::new(data)),
file_size - metadata_length..file_size,
);
- let metadata = MetadataLoader::load(&mut reader, file_size, None)
+ ParquetMetaDataReader::new()
+ .with_page_indexes(true)
+ .load_and_finish(&mut reader, file_size)
.await
- .unwrap();
- let loaded_metadata = metadata.finish();
- let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata);
- metadata.load_page_index(true, true).await.unwrap();
- metadata.finish()
+ .unwrap()
}
fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right:
&ColumnChunkMetaData) {
diff --git a/parquet/tests/arrow_reader/bad_data.rs
b/parquet/tests/arrow_reader/bad_data.rs
index a73864070d..e2975c17c8 100644
--- a/parquet/tests/arrow_reader/bad_data.rs
+++ b/parquet/tests/arrow_reader/bad_data.rs
@@ -140,20 +140,23 @@ fn read_file(name: &str) -> Result<usize, ParquetError> {
#[tokio::test]
async fn bad_metadata_err() {
use bytes::Bytes;
- use parquet::arrow::async_reader::MetadataLoader;
+ use parquet::file::metadata::ParquetMetaDataReader;
let metadata_buffer =
Bytes::from_static(include_bytes!("bad_raw_metadata.bin"));
let metadata_length = metadata_buffer.len();
let mut reader = std::io::Cursor::new(&metadata_buffer);
- let mut loader = MetadataLoader::load(&mut reader, metadata_length, None)
- .await
- .unwrap();
- loader.load_page_index(false, false).await.unwrap();
- loader.load_page_index(false, true).await.unwrap();
+ let mut loader = ParquetMetaDataReader::new();
+ loader.try_load(&mut reader, metadata_length).await.unwrap();
+ loader = loader.with_page_indexes(false);
+ loader.load_page_index(&mut reader).await.unwrap();
- let err = loader.load_page_index(true, false).await.unwrap_err();
+ loader = loader.with_offset_indexes(true);
+ loader.load_page_index(&mut reader).await.unwrap();
+
+ loader = loader.with_column_indexes(true);
+ let err = loader.load_page_index(&mut reader).await.unwrap_err();
assert_eq!(
err.to_string(),