This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new d1d2c9561c Clean up `ArrowReaderMetadata::load_async` (#7369)
d1d2c9561c is described below

commit d1d2c9561ce64890e9fd97fa74b9c1194a0e6804
Author: Ed Seidl <[email protected]>
AuthorDate: Fri Apr 4 15:26:54 2025 -0700

    Clean up `ArrowReaderMetadata::load_async` (#7369)
    
    * clean up async parquet reader
    
    * format and clippy
    
    * remove outdated documentation per review suggestion
---
 parquet/src/arrow/async_reader/mod.rs | 188 +++++++++++-----------------------
 1 file changed, 62 insertions(+), 126 deletions(-)

diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index 5d5a7036ee..6ce33c784e 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -53,7 +53,6 @@ use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
 use crate::file::page_index::offset_index::OffsetIndexMetaData;
 use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
-use crate::file::FOOTER_SIZE;
 use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, 
BloomFilterHash};
 
 mod metadata;
@@ -126,6 +125,18 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
     }
 }
 
+impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> 
MetadataSuffixFetch for T {
+    fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
+        async move {
+            self.seek(SeekFrom::End(-(suffix as i64))).await?;
+            let mut buf = Vec::with_capacity(suffix);
+            self.take(suffix as _).read_to_end(&mut buf).await?;
+            Ok(buf.into())
+        }
+        .boxed()
+    }
+}
+
 impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
     fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, 
Result<Bytes>> {
         async move {
@@ -147,31 +158,16 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> 
AsyncFileReader for T {
         &'a mut self,
         options: Option<&'a ArrowReaderOptions>,
     ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
-        const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
         async move {
-            self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
-
-            let mut buf = [0_u8; FOOTER_SIZE];
-            self.read_exact(&mut buf).await?;
-
-            let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?;
-            let metadata_len = footer.metadata_length();
-
-            self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
-                .await?;
-
-            let mut buf = Vec::with_capacity(metadata_len);
-            self.take(metadata_len as _).read_to_end(&mut buf).await?;
-
-            let metadata_reader = ParquetMetaDataReader::new();
+            let metadata_reader = ParquetMetaDataReader::new()
+                .with_page_indexes(options.is_some_and(|o| o.page_index));
 
             #[cfg(feature = "encryption")]
             let metadata_reader = metadata_reader.with_decryption_properties(
                 options.and_then(|o| o.file_decryption_properties.as_ref()),
             );
 
-            let parquet_metadata = 
metadata_reader.decode_footer_metadata(&buf, &footer)?;
-
+            let parquet_metadata = 
metadata_reader.load_via_suffix_and_finish(self).await?;
             Ok(Arc::new(parquet_metadata))
         }
         .boxed()
@@ -182,36 +178,11 @@ impl ArrowReaderMetadata {
     /// Returns a new [`ArrowReaderMetadata`] for this builder
     ///
     /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how 
this can be used
-    ///
-    /// # Notes
-    ///
-    /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
-    /// `Self::metadata` is missing the page index, this function will attempt
-    /// to load the page index by making an object store request.
     pub async fn load_async<T: AsyncFileReader>(
         input: &mut T,
         options: ArrowReaderOptions,
     ) -> Result<Self> {
-        // TODO: this is all rather awkward. It would be nice if 
AsyncFileReader::get_metadata
-        // took an argument to fetch the page indexes.
-        let mut metadata = input.get_metadata(Some(&options)).await?;
-
-        if options.page_index
-            && metadata.column_index().is_none()
-            && metadata.offset_index().is_none()
-        {
-            let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| 
e.as_ref().clone());
-            let mut reader = 
ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
-
-            #[cfg(feature = "encryption")]
-            {
-                reader =
-                    
reader.with_decryption_properties(options.file_decryption_properties.as_ref());
-            }
-
-            reader.load_page_index(input).await?;
-            metadata = Arc::new(reader.finish()?)
-        }
+        let metadata = input.get_metadata(Some(&options)).await?;
         Self::try_new(metadata, options)
     }
 }
@@ -1120,7 +1091,7 @@ mod tests {
     #[derive(Clone)]
     struct TestReader {
         data: Bytes,
-        metadata: Arc<ParquetMetaData>,
+        metadata: Option<Arc<ParquetMetaData>>,
         requests: Arc<Mutex<Vec<Range<usize>>>>,
     }
 
@@ -1132,9 +1103,14 @@ mod tests {
 
         fn get_metadata<'a>(
             &'a mut self,
-            _options: Option<&'a ArrowReaderOptions>,
+            options: Option<&'a ArrowReaderOptions>,
         ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
-            futures::future::ready(Ok(self.metadata.clone())).boxed()
+            let metadata_reader = ParquetMetaDataReader::new()
+                .with_page_indexes(options.is_some_and(|o| o.page_index));
+            self.metadata = Some(Arc::new(
+                metadata_reader.parse_and_finish(&self.data).unwrap(),
+            ));
+            
futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
         }
     }
 
@@ -1144,16 +1120,9 @@ mod tests {
         let path = format!("{testdata}/alltypes_plain.parquet");
         let data = Bytes::from(std::fs::read(path).unwrap());
 
-        let metadata = ParquetMetaDataReader::new()
-            .parse_and_finish(&data)
-            .unwrap();
-        let metadata = Arc::new(metadata);
-
-        assert_eq!(metadata.num_row_groups(), 1);
-
         let async_reader = TestReader {
             data: data.clone(),
-            metadata: metadata.clone(),
+            metadata: Default::default(),
             requests: Default::default(),
         };
 
@@ -1162,6 +1131,9 @@ mod tests {
             .await
             .unwrap();
 
+        let metadata = builder.metadata().clone();
+        assert_eq!(metadata.num_row_groups(), 1);
+
         let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 
2]);
         let stream = builder
             .with_projection(mask.clone())
@@ -1201,16 +1173,9 @@ mod tests {
         let path = format!("{testdata}/alltypes_plain.parquet");
         let data = Bytes::from(std::fs::read(path).unwrap());
 
-        let metadata = ParquetMetaDataReader::new()
-            .parse_and_finish(&data)
-            .unwrap();
-        let metadata = Arc::new(metadata);
-
-        assert_eq!(metadata.num_row_groups(), 1);
-
         let async_reader = TestReader {
             data: data.clone(),
-            metadata: metadata.clone(),
+            metadata: Default::default(),
             requests: Default::default(),
         };
 
@@ -1219,6 +1184,9 @@ mod tests {
             .await
             .unwrap();
 
+        let metadata = builder.metadata().clone();
+        assert_eq!(metadata.num_row_groups(), 1);
+
         let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 
2]);
         let mut stream = builder
             .with_projection(mask.clone())
@@ -1266,16 +1234,9 @@ mod tests {
         let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
         let data = Bytes::from(std::fs::read(path).unwrap());
 
-        let metadata = ParquetMetaDataReader::new()
-            .parse_and_finish(&data)
-            .unwrap();
-        let metadata = Arc::new(metadata);
-
-        assert_eq!(metadata.num_row_groups(), 1);
-
         let async_reader = TestReader {
             data: data.clone(),
-            metadata: metadata.clone(),
+            metadata: Default::default(),
             requests: Default::default(),
         };
 
@@ -1286,6 +1247,7 @@ mod tests {
 
         // The builder should have page and offset indexes loaded now
         let metadata_with_index = builder.metadata();
+        assert_eq!(metadata_with_index.num_row_groups(), 1);
 
         // Check offset indexes are present for all columns
         let offset_index = metadata_with_index.offset_index().unwrap();
@@ -1343,7 +1305,7 @@ mod tests {
 
         let async_reader = TestReader {
             data: data.clone(),
-            metadata: metadata.clone(),
+            metadata: Default::default(),
             requests: Default::default(),
         };
 
@@ -1351,6 +1313,8 @@ mod tests {
             .await
             .unwrap();
 
+        assert_eq!(builder.metadata().num_row_groups(), 1);
+
         let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 
2]);
         let stream = builder
             .with_projection(mask.clone())
@@ -1380,16 +1344,9 @@ mod tests {
         let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
         let data = Bytes::from(std::fs::read(path).unwrap());
 
-        let metadata = ParquetMetaDataReader::new()
-            .parse_and_finish(&data)
-            .unwrap();
-        let metadata = Arc::new(metadata);
-
-        assert_eq!(metadata.num_row_groups(), 1);
-
         let async_reader = TestReader {
             data: data.clone(),
-            metadata: metadata.clone(),
+            metadata: Default::default(),
             requests: Default::default(),
         };
 
@@ -1398,6 +1355,8 @@ mod tests {
             .await
             .unwrap();
 
+        assert_eq!(builder.metadata().num_row_groups(), 1);
+
         let selection = RowSelection::from(vec![
             RowSelector::skip(21),   // Skip first page
             RowSelector::select(21), // Select page to boundary
@@ -1438,13 +1397,6 @@ mod tests {
         let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
         let data = Bytes::from(std::fs::read(path).unwrap());
 
-        let metadata = ParquetMetaDataReader::new()
-            .parse_and_finish(&data)
-            .unwrap();
-        let metadata = Arc::new(metadata);
-
-        assert_eq!(metadata.num_row_groups(), 1);
-
         let mut rand = rng();
 
         for _ in 0..100 {
@@ -1472,7 +1424,7 @@ mod tests {
 
             let async_reader = TestReader {
                 data: data.clone(),
-                metadata: metadata.clone(),
+                metadata: Default::default(),
                 requests: Default::default(),
             };
 
@@ -1481,6 +1433,8 @@ mod tests {
                 .await
                 .unwrap();
 
+            assert_eq!(builder.metadata().num_row_groups(), 1);
+
             let col_idx: usize = rand.random_range(0..13);
             let mask = ProjectionMask::leaves(builder.parquet_schema(), 
vec![col_idx]);
 
@@ -1505,13 +1459,6 @@ mod tests {
         let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
         let data = Bytes::from(std::fs::read(path).unwrap());
 
-        let metadata = ParquetMetaDataReader::new()
-            .parse_and_finish(&data)
-            .unwrap();
-        let metadata = Arc::new(metadata);
-
-        assert_eq!(metadata.num_row_groups(), 1);
-
         let mut rand = rng();
 
         let mut expected_rows = 0;
@@ -1543,7 +1490,7 @@ mod tests {
 
         let async_reader = TestReader {
             data: data.clone(),
-            metadata: metadata.clone(),
+            metadata: Default::default(),
             requests: Default::default(),
         };
 
@@ -1552,6 +1499,8 @@ mod tests {
             .await
             .unwrap();
 
+        assert_eq!(builder.metadata().num_row_groups(), 1);
+
         let col_idx: usize = rand.random_range(0..13);
         let mask = ProjectionMask::leaves(builder.parquet_schema(), 
vec![col_idx]);
 
@@ -1593,7 +1542,7 @@ mod tests {
 
         let test = TestReader {
             data,
-            metadata: Arc::new(metadata),
+            metadata: Default::default(),
             requests: Default::default(),
         };
         let requests = test.requests.clone();
@@ -1670,7 +1619,7 @@ mod tests {
 
         let test = TestReader {
             data,
-            metadata: Arc::new(metadata),
+            metadata: Default::default(),
             requests: Default::default(),
         };
 
@@ -1756,13 +1705,12 @@ mod tests {
             .parse_and_finish(&data)
             .unwrap();
         let parquet_schema = metadata.file_metadata().schema_descr_ptr();
-        let metadata = Arc::new(metadata);
 
         assert_eq!(metadata.num_row_groups(), 1);
 
         let async_reader = TestReader {
             data: data.clone(),
-            metadata: metadata.clone(),
+            metadata: Default::default(),
             requests: Default::default(),
         };
 
@@ -1830,7 +1778,7 @@ mod tests {
 
         let async_reader = TestReader {
             data: data.clone(),
-            metadata: metadata.clone(),
+            metadata: Default::default(),
             requests: Default::default(),
         };
 
@@ -1898,15 +1846,9 @@ mod tests {
         let path = format!("{testdata}/alltypes_plain.parquet");
         let data = Bytes::from(std::fs::read(path).unwrap());
 
-        let metadata = ParquetMetaDataReader::new()
-            .parse_and_finish(&data)
-            .unwrap();
-        let file_rows = metadata.file_metadata().num_rows() as usize;
-        let metadata = Arc::new(metadata);
-
         let async_reader = TestReader {
             data: data.clone(),
-            metadata: metadata.clone(),
+            metadata: Default::default(),
             requests: Default::default(),
         };
 
@@ -1914,6 +1856,8 @@ mod tests {
             .await
             .unwrap();
 
+        let file_rows = builder.metadata().file_metadata().num_rows() as usize;
+
         let stream = builder
             .with_projection(ProjectionMask::all())
             .with_batch_size(1024)
@@ -2045,13 +1989,9 @@ mod tests {
         let testdata = arrow::util::test_util::parquet_test_data();
         let path = 
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
         let data = Bytes::from(std::fs::read(path).unwrap());
-        let metadata = ParquetMetaDataReader::new()
-            .parse_and_finish(&data)
-            .unwrap();
-        let metadata = Arc::new(metadata);
         let async_reader = TestReader {
             data: data.clone(),
-            metadata: metadata.clone(),
+            metadata: Default::default(),
             requests: Default::default(),
         };
         let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
@@ -2076,19 +2016,9 @@ mod tests {
     }
 
     async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: 
bool) {
-        let metadata = ParquetMetaDataReader::new()
-            .parse_and_finish(&data)
-            .unwrap();
-        let metadata = Arc::new(metadata);
-
-        assert_eq!(metadata.num_row_groups(), 1);
-        let row_group = metadata.row_group(0);
-        let column = row_group.column(0);
-        assert_eq!(column.bloom_filter_length().is_some(), with_length);
-
         let async_reader = TestReader {
             data: data.clone(),
-            metadata: metadata.clone(),
+            metadata: Default::default(),
             requests: Default::default(),
         };
 
@@ -2096,6 +2026,12 @@ mod tests {
             .await
             .unwrap();
 
+        let metadata = builder.metadata();
+        assert_eq!(metadata.num_row_groups(), 1);
+        let row_group = metadata.row_group(0);
+        let column = row_group.column(0);
+        assert_eq!(column.bloom_filter_length().is_some(), with_length);
+
         let sbbf = builder
             .get_row_group_column_bloom_filter(0, 0)
             .await
@@ -2225,7 +2161,7 @@ mod tests {
 
         let test = TestReader {
             data,
-            metadata: Arc::new(metadata),
+            metadata: Default::default(),
             requests: Default::default(),
         };
         let requests = test.requests.clone();

Reply via email to