This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new d1d2c9561c Clean up `ArrowReaderMetadata::load_async` (#7369)
d1d2c9561c is described below
commit d1d2c9561ce64890e9fd97fa74b9c1194a0e6804
Author: Ed Seidl <[email protected]>
AuthorDate: Fri Apr 4 15:26:54 2025 -0700
Clean up `ArrowReaderMetadata::load_async` (#7369)
* clean up async parquet reader
* format and clippy
* remove outdated documentation per review suggestion
---
parquet/src/arrow/async_reader/mod.rs | 188 +++++++++++-----------------------
1 file changed, 62 insertions(+), 126 deletions(-)
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 5d5a7036ee..6ce33c784e 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -53,7 +53,6 @@ use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
-use crate::file::FOOTER_SIZE;
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression,
BloomFilterHash};
mod metadata;
@@ -126,6 +125,18 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
}
}
+impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin>
MetadataSuffixFetch for T {
+ fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
+ async move {
+ self.seek(SeekFrom::End(-(suffix as i64))).await?;
+ let mut buf = Vec::with_capacity(suffix);
+ self.take(suffix as _).read_to_end(&mut buf).await?;
+ Ok(buf.into())
+ }
+ .boxed()
+ }
+}
+
impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_,
Result<Bytes>> {
async move {
@@ -147,31 +158,16 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send>
AsyncFileReader for T {
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
- const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
async move {
- self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
-
- let mut buf = [0_u8; FOOTER_SIZE];
- self.read_exact(&mut buf).await?;
-
- let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?;
- let metadata_len = footer.metadata_length();
-
- self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
- .await?;
-
- let mut buf = Vec::with_capacity(metadata_len);
- self.take(metadata_len as _).read_to_end(&mut buf).await?;
-
- let metadata_reader = ParquetMetaDataReader::new();
+ let metadata_reader = ParquetMetaDataReader::new()
+ .with_page_indexes(options.is_some_and(|o| o.page_index));
#[cfg(feature = "encryption")]
let metadata_reader = metadata_reader.with_decryption_properties(
options.and_then(|o| o.file_decryption_properties.as_ref()),
);
- let parquet_metadata =
metadata_reader.decode_footer_metadata(&buf, &footer)?;
-
+ let parquet_metadata =
metadata_reader.load_via_suffix_and_finish(self).await?;
Ok(Arc::new(parquet_metadata))
}
.boxed()
@@ -182,36 +178,11 @@ impl ArrowReaderMetadata {
/// Returns a new [`ArrowReaderMetadata`] for this builder
///
/// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how
this can be used
- ///
- /// # Notes
- ///
- /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
- /// `Self::metadata` is missing the page index, this function will attempt
- /// to load the page index by making an object store request.
pub async fn load_async<T: AsyncFileReader>(
input: &mut T,
options: ArrowReaderOptions,
) -> Result<Self> {
- // TODO: this is all rather awkward. It would be nice if
AsyncFileReader::get_metadata
- // took an argument to fetch the page indexes.
- let mut metadata = input.get_metadata(Some(&options)).await?;
-
- if options.page_index
- && metadata.column_index().is_none()
- && metadata.offset_index().is_none()
- {
- let m = Arc::try_unwrap(metadata).unwrap_or_else(|e|
e.as_ref().clone());
- let mut reader =
ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
-
- #[cfg(feature = "encryption")]
- {
- reader =
-
reader.with_decryption_properties(options.file_decryption_properties.as_ref());
- }
-
- reader.load_page_index(input).await?;
- metadata = Arc::new(reader.finish()?)
- }
+ let metadata = input.get_metadata(Some(&options)).await?;
Self::try_new(metadata, options)
}
}
@@ -1120,7 +1091,7 @@ mod tests {
#[derive(Clone)]
struct TestReader {
data: Bytes,
- metadata: Arc<ParquetMetaData>,
+ metadata: Option<Arc<ParquetMetaData>>,
requests: Arc<Mutex<Vec<Range<usize>>>>,
}
@@ -1132,9 +1103,14 @@ mod tests {
fn get_metadata<'a>(
&'a mut self,
- _options: Option<&'a ArrowReaderOptions>,
+ options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
- futures::future::ready(Ok(self.metadata.clone())).boxed()
+ let metadata_reader = ParquetMetaDataReader::new()
+ .with_page_indexes(options.is_some_and(|o| o.page_index));
+ self.metadata = Some(Arc::new(
+ metadata_reader.parse_and_finish(&self.data).unwrap(),
+ ));
+
futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed()
}
}
@@ -1144,16 +1120,9 @@ mod tests {
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
- let metadata = ParquetMetaDataReader::new()
- .parse_and_finish(&data)
- .unwrap();
- let metadata = Arc::new(metadata);
-
- assert_eq!(metadata.num_row_groups(), 1);
-
let async_reader = TestReader {
data: data.clone(),
- metadata: metadata.clone(),
+ metadata: Default::default(),
requests: Default::default(),
};
@@ -1162,6 +1131,9 @@ mod tests {
.await
.unwrap();
+ let metadata = builder.metadata().clone();
+ assert_eq!(metadata.num_row_groups(), 1);
+
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1,
2]);
let stream = builder
.with_projection(mask.clone())
@@ -1201,16 +1173,9 @@ mod tests {
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
- let metadata = ParquetMetaDataReader::new()
- .parse_and_finish(&data)
- .unwrap();
- let metadata = Arc::new(metadata);
-
- assert_eq!(metadata.num_row_groups(), 1);
-
let async_reader = TestReader {
data: data.clone(),
- metadata: metadata.clone(),
+ metadata: Default::default(),
requests: Default::default(),
};
@@ -1219,6 +1184,9 @@ mod tests {
.await
.unwrap();
+ let metadata = builder.metadata().clone();
+ assert_eq!(metadata.num_row_groups(), 1);
+
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1,
2]);
let mut stream = builder
.with_projection(mask.clone())
@@ -1266,16 +1234,9 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
- let metadata = ParquetMetaDataReader::new()
- .parse_and_finish(&data)
- .unwrap();
- let metadata = Arc::new(metadata);
-
- assert_eq!(metadata.num_row_groups(), 1);
-
let async_reader = TestReader {
data: data.clone(),
- metadata: metadata.clone(),
+ metadata: Default::default(),
requests: Default::default(),
};
@@ -1286,6 +1247,7 @@ mod tests {
// The builder should have page and offset indexes loaded now
let metadata_with_index = builder.metadata();
+ assert_eq!(metadata_with_index.num_row_groups(), 1);
// Check offset indexes are present for all columns
let offset_index = metadata_with_index.offset_index().unwrap();
@@ -1343,7 +1305,7 @@ mod tests {
let async_reader = TestReader {
data: data.clone(),
- metadata: metadata.clone(),
+ metadata: Default::default(),
requests: Default::default(),
};
@@ -1351,6 +1313,8 @@ mod tests {
.await
.unwrap();
+ assert_eq!(builder.metadata().num_row_groups(), 1);
+
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1,
2]);
let stream = builder
.with_projection(mask.clone())
@@ -1380,16 +1344,9 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
- let metadata = ParquetMetaDataReader::new()
- .parse_and_finish(&data)
- .unwrap();
- let metadata = Arc::new(metadata);
-
- assert_eq!(metadata.num_row_groups(), 1);
-
let async_reader = TestReader {
data: data.clone(),
- metadata: metadata.clone(),
+ metadata: Default::default(),
requests: Default::default(),
};
@@ -1398,6 +1355,8 @@ mod tests {
.await
.unwrap();
+ assert_eq!(builder.metadata().num_row_groups(), 1);
+
let selection = RowSelection::from(vec![
RowSelector::skip(21), // Skip first page
RowSelector::select(21), // Select page to boundary
@@ -1438,13 +1397,6 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
- let metadata = ParquetMetaDataReader::new()
- .parse_and_finish(&data)
- .unwrap();
- let metadata = Arc::new(metadata);
-
- assert_eq!(metadata.num_row_groups(), 1);
-
let mut rand = rng();
for _ in 0..100 {
@@ -1472,7 +1424,7 @@ mod tests {
let async_reader = TestReader {
data: data.clone(),
- metadata: metadata.clone(),
+ metadata: Default::default(),
requests: Default::default(),
};
@@ -1481,6 +1433,8 @@ mod tests {
.await
.unwrap();
+ assert_eq!(builder.metadata().num_row_groups(), 1);
+
let col_idx: usize = rand.random_range(0..13);
let mask = ProjectionMask::leaves(builder.parquet_schema(),
vec![col_idx]);
@@ -1505,13 +1459,6 @@ mod tests {
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
- let metadata = ParquetMetaDataReader::new()
- .parse_and_finish(&data)
- .unwrap();
- let metadata = Arc::new(metadata);
-
- assert_eq!(metadata.num_row_groups(), 1);
-
let mut rand = rng();
let mut expected_rows = 0;
@@ -1543,7 +1490,7 @@ mod tests {
let async_reader = TestReader {
data: data.clone(),
- metadata: metadata.clone(),
+ metadata: Default::default(),
requests: Default::default(),
};
@@ -1552,6 +1499,8 @@ mod tests {
.await
.unwrap();
+ assert_eq!(builder.metadata().num_row_groups(), 1);
+
let col_idx: usize = rand.random_range(0..13);
let mask = ProjectionMask::leaves(builder.parquet_schema(),
vec![col_idx]);
@@ -1593,7 +1542,7 @@ mod tests {
let test = TestReader {
data,
- metadata: Arc::new(metadata),
+ metadata: Default::default(),
requests: Default::default(),
};
let requests = test.requests.clone();
@@ -1670,7 +1619,7 @@ mod tests {
let test = TestReader {
data,
- metadata: Arc::new(metadata),
+ metadata: Default::default(),
requests: Default::default(),
};
@@ -1756,13 +1705,12 @@ mod tests {
.parse_and_finish(&data)
.unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
- let metadata = Arc::new(metadata);
assert_eq!(metadata.num_row_groups(), 1);
let async_reader = TestReader {
data: data.clone(),
- metadata: metadata.clone(),
+ metadata: Default::default(),
requests: Default::default(),
};
@@ -1830,7 +1778,7 @@ mod tests {
let async_reader = TestReader {
data: data.clone(),
- metadata: metadata.clone(),
+ metadata: Default::default(),
requests: Default::default(),
};
@@ -1898,15 +1846,9 @@ mod tests {
let path = format!("{testdata}/alltypes_plain.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
- let metadata = ParquetMetaDataReader::new()
- .parse_and_finish(&data)
- .unwrap();
- let file_rows = metadata.file_metadata().num_rows() as usize;
- let metadata = Arc::new(metadata);
-
let async_reader = TestReader {
data: data.clone(),
- metadata: metadata.clone(),
+ metadata: Default::default(),
requests: Default::default(),
};
@@ -1914,6 +1856,8 @@ mod tests {
.await
.unwrap();
+ let file_rows = builder.metadata().file_metadata().num_rows() as usize;
+
let stream = builder
.with_projection(ProjectionMask::all())
.with_batch_size(1024)
@@ -2045,13 +1989,9 @@ mod tests {
let testdata = arrow::util::test_util::parquet_test_data();
let path =
format!("{testdata}/data_index_bloom_encoding_stats.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
- let metadata = ParquetMetaDataReader::new()
- .parse_and_finish(&data)
- .unwrap();
- let metadata = Arc::new(metadata);
let async_reader = TestReader {
data: data.clone(),
- metadata: metadata.clone(),
+ metadata: Default::default(),
requests: Default::default(),
};
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
@@ -2076,19 +2016,9 @@ mod tests {
}
async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length:
bool) {
- let metadata = ParquetMetaDataReader::new()
- .parse_and_finish(&data)
- .unwrap();
- let metadata = Arc::new(metadata);
-
- assert_eq!(metadata.num_row_groups(), 1);
- let row_group = metadata.row_group(0);
- let column = row_group.column(0);
- assert_eq!(column.bloom_filter_length().is_some(), with_length);
-
let async_reader = TestReader {
data: data.clone(),
- metadata: metadata.clone(),
+ metadata: Default::default(),
requests: Default::default(),
};
@@ -2096,6 +2026,12 @@ mod tests {
.await
.unwrap();
+ let metadata = builder.metadata();
+ assert_eq!(metadata.num_row_groups(), 1);
+ let row_group = metadata.row_group(0);
+ let column = row_group.column(0);
+ assert_eq!(column.bloom_filter_length().is_some(), with_length);
+
let sbbf = builder
.get_row_group_column_bloom_filter(0, 0)
.await
@@ -2225,7 +2161,7 @@ mod tests {
let test = TestReader {
data,
- metadata: Arc::new(metadata),
+ metadata: Default::default(),
requests: Default::default(),
};
let requests = test.requests.clone();