This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 3293a8c2f9 Deprecate `MetadataLoader` (#6474)
3293a8c2f9 is described below

commit 3293a8c2f9062fca93bee2210d540a1d25155bf5
Author: Ed Seidl <[email protected]>
AuthorDate: Mon Sep 30 11:11:22 2024 -0700

    Deprecate `MetadataLoader` (#6474)
    
    * deprecate MetadataLoader
    
    * change signature of the load functions
    
    * fix up fetch_parquet_metadata
    
    * can now use self.meta.size directly
    
    * revert changes to load API
    
    * revert change to test code
---
 parquet/src/arrow/async_reader/metadata.rs | 12 ++++++--
 parquet/src/arrow/async_reader/mod.rs      |  8 +++--
 parquet/src/arrow/async_reader/store.rs    | 17 +++++------
 parquet/src/file/metadata/reader.rs        | 48 ++++++++++++++----------------
 parquet/src/file/metadata/writer.rs        | 12 ++++----
 parquet/tests/arrow_reader/bad_data.rs     | 17 ++++++-----
 6 files changed, 60 insertions(+), 54 deletions(-)

diff --git a/parquet/src/arrow/async_reader/metadata.rs 
b/parquet/src/arrow/async_reader/metadata.rs
index cd45d2abdb..b7fac6fe7c 100644
--- a/parquet/src/arrow/async_reader/metadata.rs
+++ b/parquet/src/arrow/async_reader/metadata.rs
@@ -52,6 +52,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
     /// Create a new [`MetadataLoader`] by reading the footer information
     ///
     /// See [`fetch_parquet_metadata`] for the meaning of the individual 
parameters
+    #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
     pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) 
-> Result<Self> {
         if file_size < FOOTER_SIZE {
             return Err(ParquetError::EOF(format!(
@@ -108,6 +109,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
     }
 
     /// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
+    #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
     pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
         Self {
             fetch,
@@ -120,6 +122,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
     ///
     /// * `column_index`: if true will load column index
     /// * `offset_index`: if true will load offset index
+    #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
     pub async fn load_page_index(&mut self, column_index: bool, offset_index: 
bool) -> Result<()> {
         if !column_index && !offset_index {
             return Ok(());
@@ -226,6 +229,7 @@ where
 /// in the first request, instead of 8, and only issue further requests
 /// if additional bytes are needed. Providing a `prefetch` hint can therefore
 /// significantly reduce the number of `fetch` requests, and consequently 
latency
+#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
 pub async fn fetch_parquet_metadata<F, Fut>(
     fetch: F,
     file_size: usize,
@@ -236,10 +240,14 @@ where
     Fut: Future<Output = Result<Bytes>> + Send,
 {
     let fetch = MetadataFetchFn(fetch);
-    let loader = MetadataLoader::load(fetch, file_size, prefetch).await?;
-    Ok(loader.finish())
+    ParquetMetaDataReader::new()
+        .with_prefetch_hint(prefetch)
+        .load_and_finish(fetch, file_size)
+        .await
 }
 
+// these tests are all replicated in parquet::file::metadata::reader
+#[allow(deprecated)]
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index 89e4d6adb5..5e8bdbc02e 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -212,6 +212,8 @@ impl ArrowReaderMetadata {
         input: &mut T,
         options: ArrowReaderOptions,
     ) -> Result<Self> {
+        // TODO: this is all rather awkward. It would be nice if 
AsyncFileReader::get_metadata
+        // took an argument to fetch the page indexes.
         let mut metadata = input.get_metadata().await?;
 
         if options.page_index
@@ -219,9 +221,9 @@ impl ArrowReaderMetadata {
             && metadata.offset_index().is_none()
         {
             let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| 
e.as_ref().clone());
-            let mut loader = MetadataLoader::new(input, m);
-            loader.load_page_index(true, true).await?;
-            metadata = Arc::new(loader.finish())
+            let mut reader = 
ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
+            reader.load_page_index(input).await?;
+            metadata = Arc::new(reader.finish()?)
         }
         Self::try_new(metadata, options)
     }
diff --git a/parquet/src/arrow/async_reader/store.rs 
b/parquet/src/arrow/async_reader/store.rs
index 77c00e91a3..e6b47856eb 100644
--- a/parquet/src/arrow/async_reader/store.rs
+++ b/parquet/src/arrow/async_reader/store.rs
@@ -24,9 +24,9 @@ use futures::{FutureExt, TryFutureExt};
 
 use object_store::{ObjectMeta, ObjectStore};
 
-use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader};
+use crate::arrow::async_reader::AsyncFileReader;
 use crate::errors::Result;
-use crate::file::metadata::ParquetMetaData;
+use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
 
 /// Reads Parquet files in object storage using [`ObjectStore`].
 ///
@@ -124,15 +124,14 @@ impl AsyncFileReader for ParquetObjectReader {
 
     fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
         Box::pin(async move {
-            let preload_column_index = self.preload_column_index;
-            let preload_offset_index = self.preload_offset_index;
             let file_size = self.meta.size;
-            let prefetch = self.metadata_size_hint;
-            let mut loader = MetadataLoader::load(self, file_size, 
prefetch).await?;
-            loader
-                .load_page_index(preload_column_index, preload_offset_index)
+            let metadata = ParquetMetaDataReader::new()
+                .with_column_indexes(self.preload_column_index)
+                .with_offset_indexes(self.preload_offset_index)
+                .with_prefetch_hint(self.metadata_size_hint)
+                .load_and_finish(self, file_size)
                 .await?;
-            Ok(Arc::new(loader.finish()))
+            Ok(Arc::new(metadata))
         })
     }
 }
diff --git a/parquet/src/file/metadata/reader.rs 
b/parquet/src/file/metadata/reader.rs
index 9e00c68604..3fd2bd76f6 100644
--- a/parquet/src/file/metadata/reader.rs
+++ b/parquet/src/file/metadata/reader.rs
@@ -329,13 +329,18 @@ impl ParquetMetaDataReader {
             return Ok(());
         }
 
-        self.load_page_index(fetch, remainder).await
+        self.load_page_index_with_remainder(fetch, remainder).await
     }
 
     /// Asynchronously fetch the page index structures when a 
[`ParquetMetaData`] has already
     /// been obtained. See [`Self::new_with_metadata()`].
     #[cfg(feature = "async")]
-    pub async fn load_page_index<F: MetadataFetch>(
+    pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> 
Result<()> {
+        self.load_page_index_with_remainder(fetch, None).await
+    }
+
+    #[cfg(feature = "async")]
+    async fn load_page_index_with_remainder<F: MetadataFetch>(
         &mut self,
         mut fetch: F,
         remainder: Option<(usize, Bytes)>,
@@ -836,7 +841,7 @@ mod async_tests {
 
     struct MetadataFetchFn<F>(F);
 
-    impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
+    impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn<F>
     where
         F: FnMut(Range<usize>) -> Fut + Send,
         Fut: Future<Output = Result<Bytes>> + Send,
@@ -865,14 +870,14 @@ mod async_tests {
         let expected = expected.file_metadata().schema();
         let fetch_count = AtomicUsize::new(0);
 
-        let mut fetch = |range| {
+        let fetch = |range| {
             fetch_count.fetch_add(1, Ordering::SeqCst);
             futures::future::ready(read_range(&mut file, range))
         };
 
-        let input = MetadataFetchFn(&mut fetch);
+        let mut f = MetadataFetchFn(fetch);
         let actual = ParquetMetaDataReader::new()
-            .load_and_finish(input, len)
+            .load_and_finish(&mut f, len)
             .await
             .unwrap();
         assert_eq!(actual.file_metadata().schema(), expected);
@@ -880,10 +885,9 @@ mod async_tests {
 
         // Metadata hint too small - below footer size
         fetch_count.store(0, Ordering::SeqCst);
-        let input = MetadataFetchFn(&mut fetch);
         let actual = ParquetMetaDataReader::new()
             .with_prefetch_hint(Some(7))
-            .load_and_finish(input, len)
+            .load_and_finish(&mut f, len)
             .await
             .unwrap();
         assert_eq!(actual.file_metadata().schema(), expected);
@@ -891,10 +895,9 @@ mod async_tests {
 
         // Metadata hint too small
         fetch_count.store(0, Ordering::SeqCst);
-        let input = MetadataFetchFn(&mut fetch);
         let actual = ParquetMetaDataReader::new()
             .with_prefetch_hint(Some(10))
-            .load_and_finish(input, len)
+            .load_and_finish(&mut f, len)
             .await
             .unwrap();
         assert_eq!(actual.file_metadata().schema(), expected);
@@ -902,10 +905,9 @@ mod async_tests {
 
         // Metadata hint too large
         fetch_count.store(0, Ordering::SeqCst);
-        let input = MetadataFetchFn(&mut fetch);
         let actual = ParquetMetaDataReader::new()
             .with_prefetch_hint(Some(500))
-            .load_and_finish(input, len)
+            .load_and_finish(&mut f, len)
             .await
             .unwrap();
         assert_eq!(actual.file_metadata().schema(), expected);
@@ -913,26 +915,23 @@ mod async_tests {
 
         // Metadata hint exactly correct
         fetch_count.store(0, Ordering::SeqCst);
-        let input = MetadataFetchFn(&mut fetch);
         let actual = ParquetMetaDataReader::new()
             .with_prefetch_hint(Some(428))
-            .load_and_finish(input, len)
+            .load_and_finish(&mut f, len)
             .await
             .unwrap();
         assert_eq!(actual.file_metadata().schema(), expected);
         assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
 
-        let input = MetadataFetchFn(&mut fetch);
         let err = ParquetMetaDataReader::new()
-            .load_and_finish(input, 4)
+            .load_and_finish(&mut f, 4)
             .await
             .unwrap_err()
             .to_string();
         assert_eq!(err, "EOF: file size of 4 is less than footer");
 
-        let input = MetadataFetchFn(&mut fetch);
         let err = ParquetMetaDataReader::new()
-            .load_and_finish(input, 20)
+            .load_and_finish(&mut f, 20)
             .await
             .unwrap_err()
             .to_string();
@@ -949,42 +948,39 @@ mod async_tests {
             futures::future::ready(read_range(&mut file, range))
         };
 
-        let f = MetadataFetchFn(&mut fetch);
+        let mut f = MetadataFetchFn(&mut fetch);
         let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
-        loader.try_load(f, len).await.unwrap();
+        loader.try_load(&mut f, len).await.unwrap();
         assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
         let metadata = loader.finish().unwrap();
         assert!(metadata.offset_index().is_some() && 
metadata.column_index().is_some());
 
         // Prefetch just footer exactly
         fetch_count.store(0, Ordering::SeqCst);
-        let f = MetadataFetchFn(&mut fetch);
         let mut loader = ParquetMetaDataReader::new()
             .with_page_indexes(true)
             .with_prefetch_hint(Some(1729));
-        loader.try_load(f, len).await.unwrap();
+        loader.try_load(&mut f, len).await.unwrap();
         assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
         let metadata = loader.finish().unwrap();
         assert!(metadata.offset_index().is_some() && 
metadata.column_index().is_some());
 
         // Prefetch more than footer but not enough
         fetch_count.store(0, Ordering::SeqCst);
-        let f = MetadataFetchFn(&mut fetch);
         let mut loader = ParquetMetaDataReader::new()
             .with_page_indexes(true)
             .with_prefetch_hint(Some(130649));
-        loader.try_load(f, len).await.unwrap();
+        loader.try_load(&mut f, len).await.unwrap();
         assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
         let metadata = loader.finish().unwrap();
         assert!(metadata.offset_index().is_some() && 
metadata.column_index().is_some());
 
         // Prefetch exactly enough
         fetch_count.store(0, Ordering::SeqCst);
-        let f = MetadataFetchFn(&mut fetch);
         let metadata = ParquetMetaDataReader::new()
             .with_page_indexes(true)
             .with_prefetch_hint(Some(130650))
-            .load_and_finish(f, len)
+            .load_and_finish(&mut f, len)
             .await
             .unwrap();
         assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
diff --git a/parquet/src/file/metadata/writer.rs 
b/parquet/src/file/metadata/writer.rs
index db78606e42..44328c635f 100644
--- a/parquet/src/file/metadata/writer.rs
+++ b/parquet/src/file/metadata/writer.rs
@@ -516,7 +516,7 @@ mod tests {
     /// Temporary function so we can test loading metadata with page indexes
     /// while we haven't fully figured out how to load it cleanly
     async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> 
ParquetMetaData {
-        use crate::arrow::async_reader::{MetadataFetch, MetadataLoader};
+        use crate::arrow::async_reader::MetadataFetch;
         use crate::errors::Result as ParquetResult;
         use futures::future::BoxFuture;
         use futures::FutureExt;
@@ -569,13 +569,11 @@ mod tests {
             Box::new(AsyncBytes::new(data)),
             file_size - metadata_length..file_size,
         );
-        let metadata = MetadataLoader::load(&mut reader, file_size, None)
+        ParquetMetaDataReader::new()
+            .with_page_indexes(true)
+            .load_and_finish(&mut reader, file_size)
             .await
-            .unwrap();
-        let loaded_metadata = metadata.finish();
-        let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata);
-        metadata.load_page_index(true, true).await.unwrap();
-        metadata.finish()
+            .unwrap()
     }
 
     fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: 
&ColumnChunkMetaData) {
diff --git a/parquet/tests/arrow_reader/bad_data.rs 
b/parquet/tests/arrow_reader/bad_data.rs
index a73864070d..e2975c17c8 100644
--- a/parquet/tests/arrow_reader/bad_data.rs
+++ b/parquet/tests/arrow_reader/bad_data.rs
@@ -140,20 +140,23 @@ fn read_file(name: &str) -> Result<usize, ParquetError> {
 #[tokio::test]
 async fn bad_metadata_err() {
     use bytes::Bytes;
-    use parquet::arrow::async_reader::MetadataLoader;
+    use parquet::file::metadata::ParquetMetaDataReader;
 
     let metadata_buffer = 
Bytes::from_static(include_bytes!("bad_raw_metadata.bin"));
 
     let metadata_length = metadata_buffer.len();
 
     let mut reader = std::io::Cursor::new(&metadata_buffer);
-    let mut loader = MetadataLoader::load(&mut reader, metadata_length, None)
-        .await
-        .unwrap();
-    loader.load_page_index(false, false).await.unwrap();
-    loader.load_page_index(false, true).await.unwrap();
+    let mut loader = ParquetMetaDataReader::new();
+    loader.try_load(&mut reader, metadata_length).await.unwrap();
+    loader = loader.with_page_indexes(false);
+    loader.load_page_index(&mut reader).await.unwrap();
 
-    let err = loader.load_page_index(true, false).await.unwrap_err();
+    loader = loader.with_offset_indexes(true);
+    loader.load_page_index(&mut reader).await.unwrap();
+
+    loader = loader.with_column_indexes(true);
+    let err = loader.load_page_index(&mut reader).await.unwrap_err();
 
     assert_eq!(
         err.to_string(),

Reply via email to