This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new dfb8c7696 Add offset pushdown to parquet (#3848)
dfb8c7696 is described below

commit dfb8c769606efd4fd8731706b287993479b339ca
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Mar 14 08:43:57 2023 +0000

    Add offset pushdown to parquet (#3848)
---
 parquet/src/arrow/arrow_reader/mod.rs       |  92 +++++++++++++++++----
 parquet/src/arrow/arrow_reader/selection.rs | 124 ++++++++++++++++++++++++----
 parquet/src/arrow/async_reader/mod.rs       | 114 ++++++++++++++++++++-----
 3 files changed, 280 insertions(+), 50 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index c4b645da7..6c8d08de2 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -71,6 +71,8 @@ pub struct ArrowReaderBuilder<T> {
     pub(crate) selection: Option<RowSelection>,
 
     pub(crate) limit: Option<usize>,
+
+    pub(crate) offset: Option<usize>,
 }
 
 impl<T> ArrowReaderBuilder<T> {
@@ -101,6 +103,7 @@ impl<T> ArrowReaderBuilder<T> {
             filter: None,
             selection: None,
             limit: None,
+            offset: None,
         })
     }
 
@@ -181,6 +184,17 @@ impl<T> ArrowReaderBuilder<T> {
             ..self
         }
     }
+
+    /// Provide an offset to skip over the given number of rows
+    ///
+    /// The offset will be applied after any [`Self::with_row_selection`] and 
[`Self::with_row_filter`]
+    /// allowing it to skip rows after any pushed down predicates
+    pub fn with_offset(self, offset: usize) -> Self {
+        Self {
+            offset: Some(offset),
+            ..self
+        }
+    }
 }
 
 /// Arrow reader api.
@@ -467,23 +481,10 @@ impl<T: ChunkReader + 'static> 
ArrowReaderBuilder<SyncReader<T>> {
             selection = Some(RowSelection::from(vec![]));
         }
 
-        // If a limit is defined, apply it to the final `RowSelection`
-        if let Some(limit) = self.limit {
-            selection = Some(
-                selection
-                    .map(|selection| selection.limit(limit))
-                    .unwrap_or_else(|| {
-                        RowSelection::from(vec![RowSelector::select(
-                            limit.min(reader.num_rows()),
-                        )])
-                    }),
-            );
-        }
-
         Ok(ParquetRecordBatchReader::new(
             batch_size,
             array_reader,
-            selection,
+            apply_range(selection, reader.num_rows(), self.offset, self.limit),
         ))
     }
 }
@@ -620,6 +621,41 @@ pub(crate) fn selects_any(selection: 
Option<&RowSelection>) -> bool {
     selection.map(|x| x.selects_any()).unwrap_or(true)
 }
 
+/// Applies an optional offset and limit to an optional [`RowSelection`]
+pub(crate) fn apply_range(
+    mut selection: Option<RowSelection>,
+    row_count: usize,
+    offset: Option<usize>,
+    limit: Option<usize>,
+) -> Option<RowSelection> {
+    // If an offset is defined, apply it to the `selection`
+    if let Some(offset) = offset {
+        selection = Some(match row_count.checked_sub(offset) {
+            None => RowSelection::from(vec![]),
+            Some(remaining) => selection
+                .map(|selection| selection.offset(offset))
+                .unwrap_or_else(|| {
+                    RowSelection::from(vec![
+                        RowSelector::skip(offset),
+                        RowSelector::select(remaining),
+                    ])
+                }),
+        });
+    }
+
+    // If a limit is defined, apply it to the final `selection`
+    if let Some(limit) = limit {
+        selection = Some(
+            selection
+                .map(|selection| selection.limit(limit))
+                .unwrap_or_else(|| {
+                    
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
+                }),
+        );
+    }
+    selection
+}
+
 /// Evaluates an [`ArrowPredicate`] returning the [`RowSelection`]
 ///
 /// If this [`ParquetRecordBatchReader`] has a [`RowSelection`], the
@@ -1244,6 +1280,8 @@ mod tests {
         row_filter: Option<Vec<bool>>,
         /// limit
         limit: Option<usize>,
+        /// offset
+        offset: Option<usize>,
     }
 
     /// Manually implement this to avoid printing entire contents of 
row_selections and row_filter
@@ -1263,6 +1301,7 @@ mod tests {
                 .field("row_selections", &self.row_selections.is_some())
                 .field("row_filter", &self.row_filter.is_some())
                 .field("limit", &self.limit)
+                .field("offset", &self.offset)
                 .finish()
         }
     }
@@ -1283,6 +1322,7 @@ mod tests {
                 row_selections: None,
                 row_filter: None,
                 limit: None,
+                offset: None,
             }
         }
     }
@@ -1361,6 +1401,13 @@ mod tests {
             }
         }
 
+        fn with_offset(self, offset: usize) -> Self {
+            Self {
+                offset: Some(offset),
+                ..self
+            }
+        }
+
         fn writer_props(&self) -> WriterProperties {
             let builder = WriterProperties::builder()
                 .set_data_pagesize_limit(self.max_data_page_size)
@@ -1427,6 +1474,12 @@ mod tests {
             TestOptions::new(4, 100, 25).with_limit(10),
             // Test with limit larger than number of rows
             TestOptions::new(4, 100, 25).with_limit(101),
+            // Test with limit + offset equal to number of rows
+            TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
+            // Test with limit + offset equal to number of rows
+            TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
+            // Test with limit + offset larger than number of rows
+            TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
             // Test with no page-level statistics
             TestOptions::new(2, 256, 91)
                 .with_null_percent(25)
@@ -1474,6 +1527,12 @@ mod tests {
                 .with_null_percent(25)
                 .with_row_selections()
                 .with_limit(10),
+            // Test optional with nulls
+            TestOptions::new(2, 256, 93)
+                .with_null_percent(25)
+                .with_row_selections()
+                .with_offset(20)
+                .with_limit(10),
             // Test filter
 
             // Test with row filter
@@ -1673,6 +1732,11 @@ mod tests {
             None => expected_data,
         };
 
+        if let Some(offset) = opts.offset {
+            builder = builder.with_offset(offset);
+            expected_data = expected_data.into_iter().skip(offset).collect();
+        }
+
         if let Some(limit) = opts.limit {
             builder = builder.with_limit(limit);
             expected_data = expected_data.into_iter().take(limit).collect();
diff --git a/parquet/src/arrow/arrow_reader/selection.rs 
b/parquet/src/arrow/arrow_reader/selection.rs
index d2af4516d..d3abf968b 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -19,7 +19,6 @@ use arrow_array::{Array, BooleanArray};
 use arrow_select::filter::SlicesIterator;
 use std::cmp::Ordering;
 use std::collections::VecDeque;
-use std::mem;
 use std::ops::Range;
 
 /// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
@@ -236,13 +235,13 @@ impl RowSelection {
         let mut total_count = 0;
 
         // Find the index where the selector exceeds the row count
-        let find = self.selectors.iter().enumerate().find(|(_, selector)| {
+        let find = self.selectors.iter().position(|selector| {
             total_count += selector.row_count;
             total_count > row_count
         });
 
         let split_idx = match find {
-            Some((idx, _)) => idx,
+            Some(idx) => idx,
             None => {
                 let selectors = std::mem::take(&mut self.selectors);
                 return Self { selectors };
@@ -372,29 +371,63 @@ impl RowSelection {
         self
     }
 
+    /// Applies an offset to this [`RowSelection`], skipping the first 
`offset` selected rows
+    pub(crate) fn offset(mut self, offset: usize) -> Self {
+        if offset == 0 {
+            return self;
+        }
+
+        let mut selected_count = 0;
+        let mut skipped_count = 0;
+
+        // Find the index where the selector exceeds the row count
+        let find = self
+            .selectors
+            .iter()
+            .position(|selector| match selector.skip {
+                true => {
+                    skipped_count += selector.row_count;
+                    false
+                }
+                false => {
+                    selected_count += selector.row_count;
+                    selected_count > offset
+                }
+            });
+
+        let split_idx = match find {
+            Some(idx) => idx,
+            None => {
+                self.selectors.clear();
+                return self;
+            }
+        };
+
+        let mut selectors = Vec::with_capacity(self.selectors.len() - 
split_idx + 1);
+        selectors.push(RowSelector::skip(skipped_count + offset));
+        selectors.push(RowSelector::select(selected_count - offset));
+        selectors.extend_from_slice(&self.selectors[split_idx + 1..]);
+
+        Self { selectors }
+    }
+
     /// Limit this [`RowSelection`] to only select `limit` rows
     pub(crate) fn limit(mut self, mut limit: usize) -> Self {
-        let mut new_selectors = Vec::with_capacity(self.selectors.len());
-        for mut selection in mem::take(&mut self.selectors) {
-            if limit == 0 {
-                break;
-            }
+        if limit == 0 {
+            self.selectors.clear();
+        }
 
+        for (idx, selection) in self.selectors.iter_mut().enumerate() {
             if !selection.skip {
                 if selection.row_count >= limit {
                     selection.row_count = limit;
-                    new_selectors.push(selection);
+                    self.selectors.truncate(idx + 1);
                     break;
                 } else {
                     limit -= selection.row_count;
-                    new_selectors.push(selection);
                 }
-            } else {
-                new_selectors.push(selection);
             }
         }
-
-        self.selectors = new_selectors;
         self
     }
 
@@ -403,6 +436,11 @@ impl RowSelection {
     pub fn iter(&self) -> impl Iterator<Item = &RowSelector> {
         self.selectors.iter()
     }
+
+    /// Returns the number of selected rows
+    pub fn row_count(&self) -> usize {
+        self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum()
+    }
 }
 
 impl From<Vec<RowSelector>> for RowSelection {
@@ -593,6 +631,64 @@ mod tests {
         assert!(selection.selectors.is_empty());
     }
 
+    #[test]
+    fn test_offset() {
+        let selection = RowSelection::from(vec![
+            RowSelector::select(5),
+            RowSelector::skip(23),
+            RowSelector::select(7),
+            RowSelector::skip(33),
+            RowSelector::select(6),
+        ]);
+
+        let selection = selection.offset(2);
+        assert_eq!(
+            selection.selectors,
+            vec![
+                RowSelector::skip(2),
+                RowSelector::select(3),
+                RowSelector::skip(23),
+                RowSelector::select(7),
+                RowSelector::skip(33),
+                RowSelector::select(6),
+            ]
+        );
+
+        let selection = selection.offset(5);
+        assert_eq!(
+            selection.selectors,
+            vec![
+                RowSelector::skip(30),
+                RowSelector::select(5),
+                RowSelector::skip(33),
+                RowSelector::select(6),
+            ]
+        );
+
+        let selection = selection.offset(3);
+        assert_eq!(
+            selection.selectors,
+            vec![
+                RowSelector::skip(33),
+                RowSelector::select(2),
+                RowSelector::skip(33),
+                RowSelector::select(6),
+            ]
+        );
+
+        let selection = selection.offset(2);
+        assert_eq!(
+            selection.selectors,
+            vec![RowSelector::skip(68), RowSelector::select(6),]
+        );
+
+        let selection = selection.offset(3);
+        assert_eq!(
+            selection.selectors,
+            vec![RowSelector::skip(71), RowSelector::select(3),]
+        );
+    }
+
     #[test]
     fn test_and() {
         let mut a = RowSelection::from(vec![
diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index 213f61818..99fe65069 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -98,8 +98,8 @@ use arrow_schema::SchemaRef;
 
 use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
 use crate::arrow::arrow_reader::{
-    evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions,
-    ParquetRecordBatchReader, RowFilter, RowSelection, RowSelector,
+    apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, 
ArrowReaderOptions,
+    ParquetRecordBatchReader, RowFilter, RowSelection,
 };
 use crate::arrow::schema::ParquetField;
 use crate::arrow::ProjectionMask;
@@ -347,12 +347,13 @@ impl<T: AsyncFileReader + Send + 'static> 
ArrowReaderBuilder<AsyncReader<T>> {
             filter: self.filter,
             metadata: self.metadata.clone(),
             fields: self.fields,
+            limit: self.limit,
+            offset: self.offset,
         };
 
         Ok(ParquetRecordBatchStream {
             metadata: self.metadata,
             batch_size,
-            limit: self.limit,
             row_groups,
             projection: self.projection,
             selection: self.selection,
@@ -375,6 +376,10 @@ struct ReaderFactory<T> {
     input: T,
 
     filter: Option<RowFilter>,
+
+    limit: Option<usize>,
+
+    offset: Option<usize>,
 }
 
 impl<T> ReaderFactory<T>
@@ -390,7 +395,6 @@ where
         mut selection: Option<RowSelection>,
         projection: ProjectionMask,
         batch_size: usize,
-        limit: Option<usize>,
     ) -> ReadResult<T> {
         // TODO: calling build_array multiple times is wasteful
 
@@ -428,19 +432,37 @@ where
             }
         }
 
-        if !selects_any(selection.as_ref()) {
+        // Compute the number of rows in the selection before applying limit 
and offset
+        let rows_before = selection
+            .as_ref()
+            .map(|s| s.row_count())
+            .unwrap_or(row_group.row_count);
+
+        if rows_before == 0 {
+            return Ok((self, None));
+        }
+
+        selection = apply_range(selection, row_group.row_count, self.offset, 
self.limit);
+
+        // Compute the number of rows in the selection after applying limit 
and offset
+        let rows_after = selection
+            .as_ref()
+            .map(|s| s.row_count())
+            .unwrap_or(row_group.row_count);
+
+        // Update offset if necessary
+        if let Some(offset) = &mut self.offset {
+            // Reduction is either because of offset or limit, as limit is 
applied
+            // after offset has been "exhausted" can just use saturating sub 
here
+            *offset = offset.saturating_sub(rows_before - rows_after)
+        }
+
+        if rows_after == 0 {
             return Ok((self, None));
         }
 
-        // If a limit is defined, apply it to the final `RowSelection`
-        if let Some(limit) = limit {
-            selection = Some(
-                selection
-                    .map(|selection| selection.limit(limit))
-                    .unwrap_or_else(|| {
-                        RowSelection::from(vec![RowSelector::select(limit)])
-                    }),
-            );
+        if let Some(limit) = &mut self.limit {
+            *limit -= rows_after;
         }
 
         row_group
@@ -492,8 +514,6 @@ pub struct ParquetRecordBatchStream<T> {
 
     batch_size: usize,
 
-    limit: Option<usize>,
-
     selection: Option<RowSelection>,
 
     /// This is an option so it can be moved into a future
@@ -535,9 +555,6 @@ where
             match &mut self.state {
                 StreamState::Decoding(batch_reader) => match 
batch_reader.next() {
                     Some(Ok(batch)) => {
-                        if let Some(limit) = self.limit.as_mut() {
-                            *limit -= batch.num_rows();
-                        }
                         return Poll::Ready(Some(Ok(batch)));
                     }
                     Some(Err(e)) => {
@@ -568,7 +585,6 @@ where
                             selection,
                             self.projection.clone(),
                             self.batch_size,
-                            self.limit,
                         )
                         .boxed();
 
@@ -824,11 +840,14 @@ mod tests {
     use crate::file::page_index::index_reader;
     use crate::file::properties::WriterProperties;
     use arrow::error::Result as ArrowResult;
+    use arrow_array::cast::as_primitive_array;
+    use arrow_array::types::Int32Type;
     use arrow_array::{Array, ArrayRef, Int32Array, StringArray};
     use futures::TryStreamExt;
     use rand::{thread_rng, Rng};
     use std::sync::Mutex;
 
+    #[derive(Clone)]
     struct TestReader {
         data: Bytes,
         metadata: Arc<ParquetMetaData>,
@@ -1320,7 +1339,7 @@ mod tests {
             requests: Default::default(),
         };
 
-        let stream = ParquetRecordBatchStreamBuilder::new(test)
+        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
             .await
             .unwrap()
             .with_batch_size(1024)
@@ -1336,11 +1355,60 @@ mod tests {
         // First batch should contain all rows
         assert_eq!(batch.num_rows(), 3);
         assert_eq!(batch.num_columns(), 3);
+        let col2 = as_primitive_array::<Int32Type>(batch.column(2));
+        assert_eq!(col2.values(), &[0, 1, 2]);
 
         let batch = &batches[1];
         // Second batch should trigger the limit and only have one row
         assert_eq!(batch.num_rows(), 1);
         assert_eq!(batch.num_columns(), 3);
+        let col2 = as_primitive_array::<Int32Type>(batch.column(2));
+        assert_eq!(col2.values(), &[3]);
+
+        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
+            .await
+            .unwrap()
+            .with_offset(2)
+            .with_limit(3)
+            .build()
+            .unwrap();
+
+        let batches: Vec<_> = stream.try_collect().await.unwrap();
+        // Expect one batch for each row group
+        assert_eq!(batches.len(), 2);
+
+        let batch = &batches[0];
+        // First batch should contain one row
+        assert_eq!(batch.num_rows(), 1);
+        assert_eq!(batch.num_columns(), 3);
+        let col2 = as_primitive_array::<Int32Type>(batch.column(2));
+        assert_eq!(col2.values(), &[2]);
+
+        let batch = &batches[1];
+        // Second batch should contain two rows
+        assert_eq!(batch.num_rows(), 2);
+        assert_eq!(batch.num_columns(), 3);
+        let col2 = as_primitive_array::<Int32Type>(batch.column(2));
+        assert_eq!(col2.values(), &[3, 4]);
+
+        let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
+            .await
+            .unwrap()
+            .with_offset(4)
+            .with_limit(20)
+            .build()
+            .unwrap();
+
+        let batches: Vec<_> = stream.try_collect().await.unwrap();
+        // Should skip first row group
+        assert_eq!(batches.len(), 1);
+
+        let batch = &batches[0];
+        // First batch should contain two rows
+        assert_eq!(batch.num_rows(), 2);
+        assert_eq!(batch.num_columns(), 3);
+        let col2 = as_primitive_array::<Int32Type>(batch.column(2));
+        assert_eq!(col2.values(), &[4, 5]);
     }
 
     #[tokio::test]
@@ -1440,6 +1508,8 @@ mod tests {
             fields,
             input: async_reader,
             filter: None,
+            limit: None,
+            offset: None,
         };
 
         let mut skip = true;
@@ -1469,7 +1539,7 @@ mod tests {
         let selection = RowSelection::from(selectors);
 
         let (_factory, _reader) = reader_factory
-            .read_row_group(0, Some(selection), projection.clone(), 48, None)
+            .read_row_group(0, Some(selection), projection.clone(), 48)
             .await
             .expect("reading row group");
 

Reply via email to