This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new be0d34d4f Add get_byte_ranges method to AsyncFileReader trait (#2115)
be0d34d4f is described below
commit be0d34d4fd37431ed57e7ab001da5f55a016a76e
Author: Dan Harris <[email protected]>
AuthorDate: Thu Jul 21 08:05:21 2022 -0400
Add get_byte_ranges method to AsyncFileReader trait (#2115)
* Add get_byte_ranges method to AsyncFileReader trait
* Remove overhead
* linting
* pr comments
---
parquet/src/arrow/async_reader.rs | 60 +++++++++++++++++++++++++++++++--------
1 file changed, 48 insertions(+), 12 deletions(-)
diff --git a/parquet/src/arrow/async_reader.rs
b/parquet/src/arrow/async_reader.rs
index 923f329ef..19e1de9fc 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -77,6 +77,7 @@
use std::collections::VecDeque;
use std::fmt::Formatter;
+
use std::io::{Cursor, SeekFrom};
use std::ops::Range;
use std::pin::Pin;
@@ -111,6 +112,27 @@ pub trait AsyncFileReader {
/// Retrieve the bytes in `range`
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_,
Result<Bytes>>;
+ /// Retrieve multiple byte ranges. The default implementation will call
`get_bytes` sequentially
+ fn get_byte_ranges(
+ &mut self,
+ ranges: Vec<Range<usize>>,
+ ) -> BoxFuture<'_, Result<Vec<Bytes>>>
+ where
+ Self: Send,
+ {
+ async move {
+ let mut result = Vec::with_capacity(ranges.len());
+
+ for range in ranges.into_iter() {
+ let data = self.get_bytes(range).await?;
+ result.push(data);
+ }
+
+ Ok(result)
+ }
+ .boxed()
+ }
+
/// Provides asynchronous access to the [`ParquetMetaData`] of a parquet
file,
/// allowing fine-grained control over how metadata is sourced, in
particular allowing
/// for caching, pre-fetching, catalog metadata, etc...
@@ -367,24 +389,38 @@ where
vec![None; row_group_metadata.columns().len()];
// TODO: Combine consecutive ranges
+ let fetch_ranges = (0..column_chunks.len())
+ .into_iter()
+ .filter_map(|idx| {
+ if !projection.leaf_included(idx) {
+ None
+ } else {
+ let column =
row_group_metadata.column(idx);
+ let (start, length) =
column.byte_range();
+
+ Some(start as usize..(start + length)
as usize)
+ }
+ })
+ .collect();
+
+ let mut chunk_data =
+
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+
for (idx, chunk) in
column_chunks.iter_mut().enumerate() {
if !projection.leaf_included(idx) {
continue;
}
let column = row_group_metadata.column(idx);
- let (start, length) = column.byte_range();
-
- let data = input
- .get_bytes(start as usize..(start +
length) as usize)
- .await?;
-
- *chunk = Some(InMemoryColumnChunk {
- num_values: column.num_values(),
- compression: column.compression(),
- physical_type: column.column_type(),
- data,
- });
+
+ if let Some(data) = chunk_data.next() {
+ *chunk = Some(InMemoryColumnChunk {
+ num_values: column.num_values(),
+ compression: column.compression(),
+ physical_type: column.column_type(),
+ data,
+ });
+ }
}
Ok((