This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 2bca71e Introduce `ReadOptions` with builder API, for parquet filter
row groups that satisfy all filters, and enable filter row groups by range.
(#1389)
2bca71e is described below
commit 2bca71e322fcab6c6d93a47ef71638a617e29f6c
Author: Yijie Shen <[email protected]>
AuthorDate: Sun Mar 6 19:44:54 2022 +0800
Introduce `ReadOptions` with builder API, for parquet filter row groups
that satisfy all filters, and enable filter row groups by range. (#1389)
* Filter row groups by comparing midpoint with offset range
* lint
* ReadOptions with builder API
* fix comments
* precise range doc
* tab to space
---
parquet/src/file/serialized_reader.rs | 185 ++++++++++++++++++++++++++++++----
1 file changed, 168 insertions(+), 17 deletions(-)
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index c91e832..4c10d26 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -127,6 +127,57 @@ pub struct SerializedFileReader<R: ChunkReader> {
metadata: ParquetMetaData,
}
+/// A builder for [`ReadOptions`].
+/// For the predicates that are added to the builder,
+/// they will be chained using 'AND' to filter the row groups.
+pub struct ReadOptionsBuilder {
+ predicates: Vec<Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>>,
+}
+
+impl ReadOptionsBuilder {
+ /// New builder
+ pub fn new() -> Self {
+ ReadOptionsBuilder { predicates: vec![] }
+ }
+
+ /// Add a predicate on row group metadata to the reading option,
+ /// Filter only row groups that match the predicate criteria
+ pub fn with_predicate(
+ mut self,
+ predicate: Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>,
+ ) -> Self {
+ self.predicates.push(predicate);
+ self
+ }
+
+ /// Add a range predicate on filtering row groups if their midpoints are
within
+ /// the Closed-Open range `[start..end) {x | start <= x < end}`
+ pub fn with_range(mut self, start: i64, end: i64) -> Self {
+ assert!(start < end);
+ let predicate = move |rg: &RowGroupMetaData, _: usize| {
+ let mid = get_midpoint_offset(rg);
+ mid >= start && mid < end
+ };
+ self.predicates.push(Box::new(predicate));
+ self
+ }
+
+ /// Seal the builder and return the read options
+ pub fn build(self) -> ReadOptions {
+ ReadOptions {
+ predicates: self.predicates,
+ }
+ }
+}
+
+/// A collection of options for reading a Parquet file.
+///
+/// Currently, only predicates on row group metadata are supported.
+/// All predicates will be chained using 'AND' to filter the row groups.
+pub struct ReadOptions {
+ predicates: Vec<Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>>,
+}
+
impl<R: 'static + ChunkReader> SerializedFileReader<R> {
/// Creates file reader from a Parquet file.
/// Returns error if Parquet file does not exist or is corrupt.
@@ -138,25 +189,48 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
})
}
- /// Filters row group metadata to only those row groups,
- /// for which the predicate function returns true
- pub fn filter_row_groups(
- &mut self,
- predicate: &dyn Fn(&RowGroupMetaData, usize) -> bool,
- ) {
+ /// Creates file reader from a Parquet file with read options.
+ /// Returns error if Parquet file does not exist or is corrupt.
+ pub fn new_with_options(chunk_reader: R, options: ReadOptions) ->
Result<Self> {
+ let metadata = footer::parse_metadata(&chunk_reader)?;
+ let mut predicates = options.predicates;
+ let row_groups = metadata.row_groups().to_vec();
let mut filtered_row_groups = Vec::<RowGroupMetaData>::new();
- for (i, row_group_metadata) in
self.metadata.row_groups().iter().enumerate() {
- if predicate(row_group_metadata, i) {
- filtered_row_groups.push(row_group_metadata.clone());
+ for (i, rg_meta) in row_groups.into_iter().enumerate() {
+ let mut keep = true;
+ for predicate in &mut predicates {
+ if !predicate(&rg_meta, i) {
+ keep = false;
+ break;
+ }
+ }
+ if keep {
+ filtered_row_groups.push(rg_meta);
}
}
- self.metadata = ParquetMetaData::new(
- self.metadata.file_metadata().clone(),
- filtered_row_groups,
- );
+
+ Ok(Self {
+ chunk_reader: Arc::new(chunk_reader),
+ metadata: ParquetMetaData::new(
+ metadata.file_metadata().clone(),
+ filtered_row_groups,
+ ),
+ })
}
}
+/// Get midpoint offset for a row group
+fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
+ let col = meta.column(0);
+ let mut offset = col.data_page_offset();
+ if let Some(dic_offset) = col.dictionary_page_offset() {
+ if offset > dic_offset {
+ offset = dic_offset
+ }
+ };
+ offset + meta.compressed_size() / 2
+}
+
impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
fn metadata(&self) -> &ParquetMetaData {
&self.metadata
@@ -790,19 +864,96 @@ mod tests {
}
#[test]
- fn test_file_reader_filter_row_groups() -> Result<()> {
+ fn test_file_reader_with_no_filter() -> Result<()> {
+ let test_file = get_test_file("alltypes_plain.parquet");
+ let origin_reader = SerializedFileReader::new(test_file)?;
+ // test initial number of row groups
+ let metadata = origin_reader.metadata();
+ assert_eq!(metadata.num_row_groups(), 1);
+ Ok(())
+ }
+
+ #[test]
+ fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
- let mut reader = SerializedFileReader::new(test_file)?;
+ let read_options = ReadOptionsBuilder::new()
+ .with_predicate(Box::new(|_, _| false))
+ .build();
+ let reader = SerializedFileReader::new_with_options(test_file,
read_options)?;
+ let metadata = reader.metadata();
+ assert_eq!(metadata.num_row_groups(), 0);
+ Ok(())
+ }
+ #[test]
+ fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
+ let test_file = get_test_file("alltypes_plain.parquet");
+ let origin_reader = SerializedFileReader::new(test_file)?;
// test initial number of row groups
+ let metadata = origin_reader.metadata();
+ assert_eq!(metadata.num_row_groups(), 1);
+ let mid = get_midpoint_offset(metadata.row_group(0));
+
+ let test_file = get_test_file("alltypes_plain.parquet");
+ let read_options = ReadOptionsBuilder::new().with_range(0, mid +
1).build();
+ let reader = SerializedFileReader::new_with_options(test_file,
read_options)?;
+ let metadata = reader.metadata();
+ assert_eq!(metadata.num_row_groups(), 1);
+
+ let test_file = get_test_file("alltypes_plain.parquet");
+ let read_options = ReadOptionsBuilder::new().with_range(0,
mid).build();
+ let reader = SerializedFileReader::new_with_options(test_file,
read_options)?;
+ let metadata = reader.metadata();
+ assert_eq!(metadata.num_row_groups(), 0);
+ Ok(())
+ }
+
+ #[test]
+ fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
+ let test_file = get_test_file("alltypes_plain.parquet");
+ let origin_reader = SerializedFileReader::new(test_file)?;
+ let metadata = origin_reader.metadata();
+ let mid = get_midpoint_offset(metadata.row_group(0));
+
+ // true, true predicate
+ let test_file = get_test_file("alltypes_plain.parquet");
+ let read_options = ReadOptionsBuilder::new()
+ .with_predicate(Box::new(|_, _| true))
+ .with_range(mid, mid + 1)
+ .build();
+ let reader = SerializedFileReader::new_with_options(test_file,
read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
- // test filtering out all row groups
- reader.filter_row_groups(&|_, _| false);
+ // true, false predicate
+ let test_file = get_test_file("alltypes_plain.parquet");
+ let read_options = ReadOptionsBuilder::new()
+ .with_predicate(Box::new(|_, _| true))
+ .with_range(0, mid)
+ .build();
+ let reader = SerializedFileReader::new_with_options(test_file,
read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
+ // false, true predicate
+ let test_file = get_test_file("alltypes_plain.parquet");
+ let read_options = ReadOptionsBuilder::new()
+ .with_predicate(Box::new(|_, _| false))
+ .with_range(mid, mid + 1)
+ .build();
+ let reader = SerializedFileReader::new_with_options(test_file,
read_options)?;
+ let metadata = reader.metadata();
+ assert_eq!(metadata.num_row_groups(), 0);
+
+ // false, false predicate
+ let test_file = get_test_file("alltypes_plain.parquet");
+ let read_options = ReadOptionsBuilder::new()
+ .with_predicate(Box::new(|_, _| false))
+ .with_range(0, mid)
+ .build();
+ let reader = SerializedFileReader::new_with_options(test_file,
read_options)?;
+ let metadata = reader.metadata();
+ assert_eq!(metadata.num_row_groups(), 0);
Ok(())
}
}