This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new dfb8c7696 Add offset pushdown to parquet (#3848)
dfb8c7696 is described below
commit dfb8c769606efd4fd8731706b287993479b339ca
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Mar 14 08:43:57 2023 +0000
Add offset pushdown to parquet (#3848)
---
parquet/src/arrow/arrow_reader/mod.rs | 92 +++++++++++++++++----
parquet/src/arrow/arrow_reader/selection.rs | 124 ++++++++++++++++++++++++----
parquet/src/arrow/async_reader/mod.rs | 114 ++++++++++++++++++++-----
3 files changed, 280 insertions(+), 50 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index c4b645da7..6c8d08de2 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -71,6 +71,8 @@ pub struct ArrowReaderBuilder<T> {
pub(crate) selection: Option<RowSelection>,
pub(crate) limit: Option<usize>,
+
+ pub(crate) offset: Option<usize>,
}
impl<T> ArrowReaderBuilder<T> {
@@ -101,6 +103,7 @@ impl<T> ArrowReaderBuilder<T> {
filter: None,
selection: None,
limit: None,
+ offset: None,
})
}
@@ -181,6 +184,17 @@ impl<T> ArrowReaderBuilder<T> {
..self
}
}
+
+ /// Provide an offset to skip over the given number of rows
+ ///
+ /// The offset will be applied after any [`Self::with_row_selection`] and
[`Self::with_row_filter`]
+ /// allowing it to skip rows after any pushed down predicates
+ pub fn with_offset(self, offset: usize) -> Self {
+ Self {
+ offset: Some(offset),
+ ..self
+ }
+ }
}
/// Arrow reader api.
@@ -467,23 +481,10 @@ impl<T: ChunkReader + 'static>
ArrowReaderBuilder<SyncReader<T>> {
selection = Some(RowSelection::from(vec![]));
}
- // If a limit is defined, apply it to the final `RowSelection`
- if let Some(limit) = self.limit {
- selection = Some(
- selection
- .map(|selection| selection.limit(limit))
- .unwrap_or_else(|| {
- RowSelection::from(vec![RowSelector::select(
- limit.min(reader.num_rows()),
- )])
- }),
- );
- }
-
Ok(ParquetRecordBatchReader::new(
batch_size,
array_reader,
- selection,
+ apply_range(selection, reader.num_rows(), self.offset, self.limit),
))
}
}
@@ -620,6 +621,41 @@ pub(crate) fn selects_any(selection:
Option<&RowSelection>) -> bool {
selection.map(|x| x.selects_any()).unwrap_or(true)
}
+/// Applies an optional offset and limit to an optional [`RowSelection`]
+pub(crate) fn apply_range(
+ mut selection: Option<RowSelection>,
+ row_count: usize,
+ offset: Option<usize>,
+ limit: Option<usize>,
+) -> Option<RowSelection> {
+ // If an offset is defined, apply it to the `selection`
+ if let Some(offset) = offset {
+ selection = Some(match row_count.checked_sub(offset) {
+ None => RowSelection::from(vec![]),
+ Some(remaining) => selection
+ .map(|selection| selection.offset(offset))
+ .unwrap_or_else(|| {
+ RowSelection::from(vec![
+ RowSelector::skip(offset),
+ RowSelector::select(remaining),
+ ])
+ }),
+ });
+ }
+
+ // If a limit is defined, apply it to the final `selection`
+ if let Some(limit) = limit {
+ selection = Some(
+ selection
+ .map(|selection| selection.limit(limit))
+ .unwrap_or_else(|| {
+
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
+ }),
+ );
+ }
+ selection
+}
+
/// Evaluates an [`ArrowPredicate`] returning the [`RowSelection`]
///
/// If this [`ParquetRecordBatchReader`] has a [`RowSelection`], the
@@ -1244,6 +1280,8 @@ mod tests {
row_filter: Option<Vec<bool>>,
/// limit
limit: Option<usize>,
+ /// offset
+ offset: Option<usize>,
}
/// Manually implement this to avoid printing entire contents of
row_selections and row_filter
@@ -1263,6 +1301,7 @@ mod tests {
.field("row_selections", &self.row_selections.is_some())
.field("row_filter", &self.row_filter.is_some())
.field("limit", &self.limit)
+ .field("offset", &self.offset)
.finish()
}
}
@@ -1283,6 +1322,7 @@ mod tests {
row_selections: None,
row_filter: None,
limit: None,
+ offset: None,
}
}
}
@@ -1361,6 +1401,13 @@ mod tests {
}
}
+ fn with_offset(self, offset: usize) -> Self {
+ Self {
+ offset: Some(offset),
+ ..self
+ }
+ }
+
fn writer_props(&self) -> WriterProperties {
let builder = WriterProperties::builder()
.set_data_pagesize_limit(self.max_data_page_size)
@@ -1427,6 +1474,12 @@ mod tests {
TestOptions::new(4, 100, 25).with_limit(10),
// Test with limit larger than number of rows
TestOptions::new(4, 100, 25).with_limit(101),
+ // Test with limit + offset equal to number of rows
+ TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
+ // Test with limit + offset equal to number of rows
+ TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
+ // Test with limit + offset larger than number of rows
+ TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
// Test with no page-level statistics
TestOptions::new(2, 256, 91)
.with_null_percent(25)
@@ -1474,6 +1527,12 @@ mod tests {
.with_null_percent(25)
.with_row_selections()
.with_limit(10),
+ // Test optional with nulls
+ TestOptions::new(2, 256, 93)
+ .with_null_percent(25)
+ .with_row_selections()
+ .with_offset(20)
+ .with_limit(10),
// Test filter
// Test with row filter
@@ -1673,6 +1732,11 @@ mod tests {
None => expected_data,
};
+ if let Some(offset) = opts.offset {
+ builder = builder.with_offset(offset);
+ expected_data = expected_data.into_iter().skip(offset).collect();
+ }
+
if let Some(limit) = opts.limit {
builder = builder.with_limit(limit);
expected_data = expected_data.into_iter().take(limit).collect();
diff --git a/parquet/src/arrow/arrow_reader/selection.rs
b/parquet/src/arrow/arrow_reader/selection.rs
index d2af4516d..d3abf968b 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -19,7 +19,6 @@ use arrow_array::{Array, BooleanArray};
use arrow_select::filter::SlicesIterator;
use std::cmp::Ordering;
use std::collections::VecDeque;
-use std::mem;
use std::ops::Range;
/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
@@ -236,13 +235,13 @@ impl RowSelection {
let mut total_count = 0;
// Find the index where the selector exceeds the row count
- let find = self.selectors.iter().enumerate().find(|(_, selector)| {
+ let find = self.selectors.iter().position(|selector| {
total_count += selector.row_count;
total_count > row_count
});
let split_idx = match find {
- Some((idx, _)) => idx,
+ Some(idx) => idx,
None => {
let selectors = std::mem::take(&mut self.selectors);
return Self { selectors };
@@ -372,29 +371,63 @@ impl RowSelection {
self
}
+ /// Applies an offset to this [`RowSelection`], skipping the first
`offset` selected rows
+ pub(crate) fn offset(mut self, offset: usize) -> Self {
+ if offset == 0 {
+ return self;
+ }
+
+ let mut selected_count = 0;
+ let mut skipped_count = 0;
+
+ // Find the index where the selector exceeds the row count
+ let find = self
+ .selectors
+ .iter()
+ .position(|selector| match selector.skip {
+ true => {
+ skipped_count += selector.row_count;
+ false
+ }
+ false => {
+ selected_count += selector.row_count;
+ selected_count > offset
+ }
+ });
+
+ let split_idx = match find {
+ Some(idx) => idx,
+ None => {
+ self.selectors.clear();
+ return self;
+ }
+ };
+
+ let mut selectors = Vec::with_capacity(self.selectors.len() -
split_idx + 1);
+ selectors.push(RowSelector::skip(skipped_count + offset));
+ selectors.push(RowSelector::select(selected_count - offset));
+ selectors.extend_from_slice(&self.selectors[split_idx + 1..]);
+
+ Self { selectors }
+ }
+
/// Limit this [`RowSelection`] to only select `limit` rows
pub(crate) fn limit(mut self, mut limit: usize) -> Self {
- let mut new_selectors = Vec::with_capacity(self.selectors.len());
- for mut selection in mem::take(&mut self.selectors) {
- if limit == 0 {
- break;
- }
+ if limit == 0 {
+ self.selectors.clear();
+ }
+ for (idx, selection) in self.selectors.iter_mut().enumerate() {
if !selection.skip {
if selection.row_count >= limit {
selection.row_count = limit;
- new_selectors.push(selection);
+ self.selectors.truncate(idx + 1);
break;
} else {
limit -= selection.row_count;
- new_selectors.push(selection);
}
- } else {
- new_selectors.push(selection);
}
}
-
- self.selectors = new_selectors;
self
}
@@ -403,6 +436,11 @@ impl RowSelection {
pub fn iter(&self) -> impl Iterator<Item = &RowSelector> {
self.selectors.iter()
}
+
+ /// Returns the number of selected rows
+ pub fn row_count(&self) -> usize {
+ self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum()
+ }
}
impl From<Vec<RowSelector>> for RowSelection {
@@ -593,6 +631,64 @@ mod tests {
assert!(selection.selectors.is_empty());
}
+ #[test]
+ fn test_offset() {
+ let selection = RowSelection::from(vec![
+ RowSelector::select(5),
+ RowSelector::skip(23),
+ RowSelector::select(7),
+ RowSelector::skip(33),
+ RowSelector::select(6),
+ ]);
+
+ let selection = selection.offset(2);
+ assert_eq!(
+ selection.selectors,
+ vec![
+ RowSelector::skip(2),
+ RowSelector::select(3),
+ RowSelector::skip(23),
+ RowSelector::select(7),
+ RowSelector::skip(33),
+ RowSelector::select(6),
+ ]
+ );
+
+ let selection = selection.offset(5);
+ assert_eq!(
+ selection.selectors,
+ vec![
+ RowSelector::skip(30),
+ RowSelector::select(5),
+ RowSelector::skip(33),
+ RowSelector::select(6),
+ ]
+ );
+
+ let selection = selection.offset(3);
+ assert_eq!(
+ selection.selectors,
+ vec![
+ RowSelector::skip(33),
+ RowSelector::select(2),
+ RowSelector::skip(33),
+ RowSelector::select(6),
+ ]
+ );
+
+ let selection = selection.offset(2);
+ assert_eq!(
+ selection.selectors,
+ vec![RowSelector::skip(68), RowSelector::select(6),]
+ );
+
+ let selection = selection.offset(3);
+ assert_eq!(
+ selection.selectors,
+ vec![RowSelector::skip(71), RowSelector::select(3),]
+ );
+ }
+
#[test]
fn test_and() {
let mut a = RowSelection::from(vec![
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 213f61818..99fe65069 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -98,8 +98,8 @@ use arrow_schema::SchemaRef;
use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
use crate::arrow::arrow_reader::{
- evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions,
- ParquetRecordBatchReader, RowFilter, RowSelection, RowSelector,
+ apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder,
ArrowReaderOptions,
+ ParquetRecordBatchReader, RowFilter, RowSelection,
};
use crate::arrow::schema::ParquetField;
use crate::arrow::ProjectionMask;
@@ -347,12 +347,13 @@ impl<T: AsyncFileReader + Send + 'static>
ArrowReaderBuilder<AsyncReader<T>> {
filter: self.filter,
metadata: self.metadata.clone(),
fields: self.fields,
+ limit: self.limit,
+ offset: self.offset,
};
Ok(ParquetRecordBatchStream {
metadata: self.metadata,
batch_size,
- limit: self.limit,
row_groups,
projection: self.projection,
selection: self.selection,
@@ -375,6 +376,10 @@ struct ReaderFactory<T> {
input: T,
filter: Option<RowFilter>,
+
+ limit: Option<usize>,
+
+ offset: Option<usize>,
}
impl<T> ReaderFactory<T>
@@ -390,7 +395,6 @@ where
mut selection: Option<RowSelection>,
projection: ProjectionMask,
batch_size: usize,
- limit: Option<usize>,
) -> ReadResult<T> {
// TODO: calling build_array multiple times is wasteful
@@ -428,19 +432,37 @@ where
}
}
- if !selects_any(selection.as_ref()) {
+ // Compute the number of rows in the selection before applying limit
and offset
+ let rows_before = selection
+ .as_ref()
+ .map(|s| s.row_count())
+ .unwrap_or(row_group.row_count);
+
+ if rows_before == 0 {
+ return Ok((self, None));
+ }
+
+ selection = apply_range(selection, row_group.row_count, self.offset,
self.limit);
+
+ // Compute the number of rows in the selection after applying limit
and offset
+ let rows_after = selection
+ .as_ref()
+ .map(|s| s.row_count())
+ .unwrap_or(row_group.row_count);
+
+ // Update offset if necessary
+ if let Some(offset) = &mut self.offset {
+ // Reduction is either because of offset or limit, as limit is
applied
+ // after offset has been "exhausted" can just use saturating sub
here
+ *offset = offset.saturating_sub(rows_before - rows_after)
+ }
+
+ if rows_after == 0 {
return Ok((self, None));
}
- // If a limit is defined, apply it to the final `RowSelection`
- if let Some(limit) = limit {
- selection = Some(
- selection
- .map(|selection| selection.limit(limit))
- .unwrap_or_else(|| {
- RowSelection::from(vec![RowSelector::select(limit)])
- }),
- );
+ if let Some(limit) = &mut self.limit {
+ *limit -= rows_after;
}
row_group
@@ -492,8 +514,6 @@ pub struct ParquetRecordBatchStream<T> {
batch_size: usize,
- limit: Option<usize>,
-
selection: Option<RowSelection>,
/// This is an option so it can be moved into a future
@@ -535,9 +555,6 @@ where
match &mut self.state {
StreamState::Decoding(batch_reader) => match
batch_reader.next() {
Some(Ok(batch)) => {
- if let Some(limit) = self.limit.as_mut() {
- *limit -= batch.num_rows();
- }
return Poll::Ready(Some(Ok(batch)));
}
Some(Err(e)) => {
@@ -568,7 +585,6 @@ where
selection,
self.projection.clone(),
self.batch_size,
- self.limit,
)
.boxed();
@@ -824,11 +840,14 @@ mod tests {
use crate::file::page_index::index_reader;
use crate::file::properties::WriterProperties;
use arrow::error::Result as ArrowResult;
+ use arrow_array::cast::as_primitive_array;
+ use arrow_array::types::Int32Type;
use arrow_array::{Array, ArrayRef, Int32Array, StringArray};
use futures::TryStreamExt;
use rand::{thread_rng, Rng};
use std::sync::Mutex;
+ #[derive(Clone)]
struct TestReader {
data: Bytes,
metadata: Arc<ParquetMetaData>,
@@ -1320,7 +1339,7 @@ mod tests {
requests: Default::default(),
};
- let stream = ParquetRecordBatchStreamBuilder::new(test)
+ let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
.await
.unwrap()
.with_batch_size(1024)
@@ -1336,11 +1355,60 @@ mod tests {
// First batch should contain all rows
assert_eq!(batch.num_rows(), 3);
assert_eq!(batch.num_columns(), 3);
+ let col2 = as_primitive_array::<Int32Type>(batch.column(2));
+ assert_eq!(col2.values(), &[0, 1, 2]);
let batch = &batches[1];
// Second batch should trigger the limit and only have one row
assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.num_columns(), 3);
+ let col2 = as_primitive_array::<Int32Type>(batch.column(2));
+ assert_eq!(col2.values(), &[3]);
+
+ let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
+ .await
+ .unwrap()
+ .with_offset(2)
+ .with_limit(3)
+ .build()
+ .unwrap();
+
+ let batches: Vec<_> = stream.try_collect().await.unwrap();
+ // Expect one batch for each row group
+ assert_eq!(batches.len(), 2);
+
+ let batch = &batches[0];
+ // First batch should contain one row
+ assert_eq!(batch.num_rows(), 1);
+ assert_eq!(batch.num_columns(), 3);
+ let col2 = as_primitive_array::<Int32Type>(batch.column(2));
+ assert_eq!(col2.values(), &[2]);
+
+ let batch = &batches[1];
+ // Second batch should contain two rows
+ assert_eq!(batch.num_rows(), 2);
+ assert_eq!(batch.num_columns(), 3);
+ let col2 = as_primitive_array::<Int32Type>(batch.column(2));
+ assert_eq!(col2.values(), &[3, 4]);
+
+ let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
+ .await
+ .unwrap()
+ .with_offset(4)
+ .with_limit(20)
+ .build()
+ .unwrap();
+
+ let batches: Vec<_> = stream.try_collect().await.unwrap();
+ // Should skip first row group
+ assert_eq!(batches.len(), 1);
+
+ let batch = &batches[0];
+ // First batch should contain two rows
+ assert_eq!(batch.num_rows(), 2);
+ assert_eq!(batch.num_columns(), 3);
+ let col2 = as_primitive_array::<Int32Type>(batch.column(2));
+ assert_eq!(col2.values(), &[4, 5]);
}
#[tokio::test]
@@ -1440,6 +1508,8 @@ mod tests {
fields,
input: async_reader,
filter: None,
+ limit: None,
+ offset: None,
};
let mut skip = true;
@@ -1469,7 +1539,7 @@ mod tests {
let selection = RowSelection::from(selectors);
let (_factory, _reader) = reader_factory
- .read_row_group(0, Some(selection), projection.clone(), 48, None)
+ .read_row_group(0, Some(selection), projection.clone(), 48)
.await
.expect("reading row group");