This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new be0d34d4f Add get_byte_ranges method to AsyncFileReader trait (#2115)
be0d34d4f is described below

commit be0d34d4fd37431ed57e7ab001da5f55a016a76e
Author: Dan Harris <[email protected]>
AuthorDate: Thu Jul 21 08:05:21 2022 -0400

    Add get_byte_ranges method to AsyncFileReader trait (#2115)
    
    * Add get_byte_ranges method to AsyncFileReader trait
    
    * Remove overhead
    
    * linting
    
    * pr comments
---
 parquet/src/arrow/async_reader.rs | 60 +++++++++++++++++++++++++++++++--------
 1 file changed, 48 insertions(+), 12 deletions(-)

diff --git a/parquet/src/arrow/async_reader.rs 
b/parquet/src/arrow/async_reader.rs
index 923f329ef..19e1de9fc 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -77,6 +77,7 @@
 
 use std::collections::VecDeque;
 use std::fmt::Formatter;
+
 use std::io::{Cursor, SeekFrom};
 use std::ops::Range;
 use std::pin::Pin;
@@ -111,6 +112,27 @@ pub trait AsyncFileReader {
     /// Retrieve the bytes in `range`
     fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, 
Result<Bytes>>;
 
+    /// Retrieve multiple byte ranges. The default implementation will call 
`get_bytes` sequentially
+    fn get_byte_ranges(
+        &mut self,
+        ranges: Vec<Range<usize>>,
+    ) -> BoxFuture<'_, Result<Vec<Bytes>>>
+    where
+        Self: Send,
+    {
+        async move {
+            let mut result = Vec::with_capacity(ranges.len());
+
+            for range in ranges.into_iter() {
+                let data = self.get_bytes(range).await?;
+                result.push(data);
+            }
+
+            Ok(result)
+        }
+        .boxed()
+    }
+
     /// Provides asynchronous access to the [`ParquetMetaData`] of a parquet 
file,
     /// allowing fine-grained control over how metadata is sourced, in 
particular allowing
     /// for caching, pre-fetching, catalog metadata, etc...
@@ -367,24 +389,38 @@ where
                                 vec![None; row_group_metadata.columns().len()];
 
                             // TODO: Combine consecutive ranges
+                            let fetch_ranges = (0..column_chunks.len())
+                                .into_iter()
+                                .filter_map(|idx| {
+                                    if !projection.leaf_included(idx) {
+                                        None
+                                    } else {
+                                        let column = 
row_group_metadata.column(idx);
+                                        let (start, length) = 
column.byte_range();
+
+                                        Some(start as usize..(start + length) 
as usize)
+                                    }
+                                })
+                                .collect();
+
+                            let mut chunk_data =
+                                
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+
                             for (idx, chunk) in 
column_chunks.iter_mut().enumerate() {
                                 if !projection.leaf_included(idx) {
                                     continue;
                                 }
 
                                 let column = row_group_metadata.column(idx);
-                                let (start, length) = column.byte_range();
-
-                                let data = input
-                                    .get_bytes(start as usize..(start + 
length) as usize)
-                                    .await?;
-
-                                *chunk = Some(InMemoryColumnChunk {
-                                    num_values: column.num_values(),
-                                    compression: column.compression(),
-                                    physical_type: column.column_type(),
-                                    data,
-                                });
+
+                                if let Some(data) = chunk_data.next() {
+                                    *chunk = Some(InMemoryColumnChunk {
+                                        num_values: column.num_values(),
+                                        compression: column.compression(),
+                                        physical_type: column.column_type(),
+                                        data,
+                                    });
+                                }
                             }
 
                             Ok((

Reply via email to