This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bca71e  Introduce `ReadOptions` with builder API, for parquet filter 
row groups that satisfy all filters, and enable filter row groups by range. 
(#1389)
2bca71e is described below

commit 2bca71e322fcab6c6d93a47ef71638a617e29f6c
Author: Yijie Shen <[email protected]>
AuthorDate: Sun Mar 6 19:44:54 2022 +0800

    Introduce `ReadOptions` with builder API, for parquet filter row groups 
that satisfy all filters, and enable filter row groups by range. (#1389)
    
    * Filter row groups by comparing midpoint with offset range
    
    * lint
    
    * ReadOptions with builder API
    
    * fix comments
    
    * precise range doc
    
    * tab to space
---
 parquet/src/file/serialized_reader.rs | 185 ++++++++++++++++++++++++++++++----
 1 file changed, 168 insertions(+), 17 deletions(-)

diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index c91e832..4c10d26 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -127,6 +127,57 @@ pub struct SerializedFileReader<R: ChunkReader> {
     metadata: ParquetMetaData,
 }
 
+/// A builder for [`ReadOptions`].
+/// For the predicates that are added to the builder,
+/// they will be chained using 'AND' to filter the row groups.
+pub struct ReadOptionsBuilder {
+    predicates: Vec<Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>>,
+}
+
+impl ReadOptionsBuilder {
+    /// New builder
+    pub fn new() -> Self {
+        ReadOptionsBuilder { predicates: vec![] }
+    }
+
+    /// Add a predicate on row group metadata to the reading option,
+    /// Filter only row groups that match the predicate criteria
+    pub fn with_predicate(
+        mut self,
+        predicate: Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>,
+    ) -> Self {
+        self.predicates.push(predicate);
+        self
+    }
+
+    /// Add a range predicate on filtering row groups if their midpoints are 
within
+    /// the Closed-Open range `[start..end) {x | start <= x < end}`
+    pub fn with_range(mut self, start: i64, end: i64) -> Self {
+        assert!(start < end);
+        let predicate = move |rg: &RowGroupMetaData, _: usize| {
+            let mid = get_midpoint_offset(rg);
+            mid >= start && mid < end
+        };
+        self.predicates.push(Box::new(predicate));
+        self
+    }
+
+    /// Seal the builder and return the read options
+    pub fn build(self) -> ReadOptions {
+        ReadOptions {
+            predicates: self.predicates,
+        }
+    }
+}
+
+/// A collection of options for reading a Parquet file.
+///
+/// Currently, only predicates on row group metadata are supported.
+/// All predicates will be chained using 'AND' to filter the row groups.
+pub struct ReadOptions {
+    predicates: Vec<Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>>,
+}
+
 impl<R: 'static + ChunkReader> SerializedFileReader<R> {
     /// Creates file reader from a Parquet file.
     /// Returns error if Parquet file does not exist or is corrupt.
@@ -138,25 +189,48 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
         })
     }
 
-    /// Filters row group metadata to only those row groups,
-    /// for which the predicate function returns true
-    pub fn filter_row_groups(
-        &mut self,
-        predicate: &dyn Fn(&RowGroupMetaData, usize) -> bool,
-    ) {
+    /// Creates file reader from a Parquet file with read options.
+    /// Returns error if Parquet file does not exist or is corrupt.
+    pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> 
Result<Self> {
+        let metadata = footer::parse_metadata(&chunk_reader)?;
+        let mut predicates = options.predicates;
+        let row_groups = metadata.row_groups().to_vec();
         let mut filtered_row_groups = Vec::<RowGroupMetaData>::new();
-        for (i, row_group_metadata) in 
self.metadata.row_groups().iter().enumerate() {
-            if predicate(row_group_metadata, i) {
-                filtered_row_groups.push(row_group_metadata.clone());
+        for (i, rg_meta) in row_groups.into_iter().enumerate() {
+            let mut keep = true;
+            for predicate in &mut predicates {
+                if !predicate(&rg_meta, i) {
+                    keep = false;
+                    break;
+                }
+            }
+            if keep {
+                filtered_row_groups.push(rg_meta);
             }
         }
-        self.metadata = ParquetMetaData::new(
-            self.metadata.file_metadata().clone(),
-            filtered_row_groups,
-        );
+
+        Ok(Self {
+            chunk_reader: Arc::new(chunk_reader),
+            metadata: ParquetMetaData::new(
+                metadata.file_metadata().clone(),
+                filtered_row_groups,
+            ),
+        })
     }
 }
 
+/// Get midpoint offset for a row group
+fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
+    let col = meta.column(0);
+    let mut offset = col.data_page_offset();
+    if let Some(dic_offset) = col.dictionary_page_offset() {
+        if offset > dic_offset {
+            offset = dic_offset
+        }
+    };
+    offset + meta.compressed_size() / 2
+}
+
 impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
     fn metadata(&self) -> &ParquetMetaData {
         &self.metadata
@@ -790,19 +864,96 @@ mod tests {
     }
 
     #[test]
-    fn test_file_reader_filter_row_groups() -> Result<()> {
+    fn test_file_reader_with_no_filter() -> Result<()> {
+        let test_file = get_test_file("alltypes_plain.parquet");
+        let origin_reader = SerializedFileReader::new(test_file)?;
+        // test initial number of row groups
+        let metadata = origin_reader.metadata();
+        assert_eq!(metadata.num_row_groups(), 1);
+        Ok(())
+    }
+
+    #[test]
+    fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
         let test_file = get_test_file("alltypes_plain.parquet");
-        let mut reader = SerializedFileReader::new(test_file)?;
+        let read_options = ReadOptionsBuilder::new()
+            .with_predicate(Box::new(|_, _| false))
+            .build();
+        let reader = SerializedFileReader::new_with_options(test_file, 
read_options)?;
+        let metadata = reader.metadata();
+        assert_eq!(metadata.num_row_groups(), 0);
+        Ok(())
+    }
 
+    #[test]
+    fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
+        let test_file = get_test_file("alltypes_plain.parquet");
+        let origin_reader = SerializedFileReader::new(test_file)?;
         // test initial number of row groups
+        let metadata = origin_reader.metadata();
+        assert_eq!(metadata.num_row_groups(), 1);
+        let mid = get_midpoint_offset(metadata.row_group(0));
+
+        let test_file = get_test_file("alltypes_plain.parquet");
+        let read_options = ReadOptionsBuilder::new().with_range(0, mid + 
1).build();
+        let reader = SerializedFileReader::new_with_options(test_file, 
read_options)?;
+        let metadata = reader.metadata();
+        assert_eq!(metadata.num_row_groups(), 1);
+
+        let test_file = get_test_file("alltypes_plain.parquet");
+        let read_options = ReadOptionsBuilder::new().with_range(0, 
mid).build();
+        let reader = SerializedFileReader::new_with_options(test_file, 
read_options)?;
+        let metadata = reader.metadata();
+        assert_eq!(metadata.num_row_groups(), 0);
+        Ok(())
+    }
+
+    #[test]
+    fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
+        let test_file = get_test_file("alltypes_plain.parquet");
+        let origin_reader = SerializedFileReader::new(test_file)?;
+        let metadata = origin_reader.metadata();
+        let mid = get_midpoint_offset(metadata.row_group(0));
+
+        // true, true predicate
+        let test_file = get_test_file("alltypes_plain.parquet");
+        let read_options = ReadOptionsBuilder::new()
+            .with_predicate(Box::new(|_, _| true))
+            .with_range(mid, mid + 1)
+            .build();
+        let reader = SerializedFileReader::new_with_options(test_file, 
read_options)?;
         let metadata = reader.metadata();
         assert_eq!(metadata.num_row_groups(), 1);
 
-        // test filtering out all row groups
-        reader.filter_row_groups(&|_, _| false);
+        // true, false predicate
+        let test_file = get_test_file("alltypes_plain.parquet");
+        let read_options = ReadOptionsBuilder::new()
+            .with_predicate(Box::new(|_, _| true))
+            .with_range(0, mid)
+            .build();
+        let reader = SerializedFileReader::new_with_options(test_file, 
read_options)?;
         let metadata = reader.metadata();
         assert_eq!(metadata.num_row_groups(), 0);
 
+        // false, true predicate
+        let test_file = get_test_file("alltypes_plain.parquet");
+        let read_options = ReadOptionsBuilder::new()
+            .with_predicate(Box::new(|_, _| false))
+            .with_range(mid, mid + 1)
+            .build();
+        let reader = SerializedFileReader::new_with_options(test_file, 
read_options)?;
+        let metadata = reader.metadata();
+        assert_eq!(metadata.num_row_groups(), 0);
+
+        // false, false predicate
+        let test_file = get_test_file("alltypes_plain.parquet");
+        let read_options = ReadOptionsBuilder::new()
+            .with_predicate(Box::new(|_, _| false))
+            .with_range(0, mid)
+            .build();
+        let reader = SerializedFileReader::new_with_options(test_file, 
read_options)?;
+        let metadata = reader.metadata();
+        assert_eq!(metadata.num_row_groups(), 0);
         Ok(())
     }
 }

Reply via email to