This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 8db99d246 Use upstream RowSelection::intersection in page index
pruning (#4340)
8db99d246 is described below
commit 8db99d24661e981002e9f6a5532358689200ae5f
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Dec 3 05:22:34 2022 -0500
Use upstream RowSelection::intersection in page index pruning (#4340)
---
.../file_format/parquet/page_filter.rs | 165 ++-------------------
1 file changed, 10 insertions(+), 155 deletions(-)
diff --git
a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
index eb5bb2616..9182fda99 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
@@ -36,7 +36,6 @@ use parquet::{
},
format::PageLocation,
};
-use std::collections::VecDeque;
use std::sync::Arc;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
@@ -123,7 +122,7 @@ pub(crate) fn build_page_filter(
if let (Some(file_offset_indexes), Some(file_page_indexes)) =
(file_offset_indexes, file_page_indexes)
{
- let mut row_selections =
VecDeque::with_capacity(page_index_predicates.len());
+ let mut row_selections =
Vec::with_capacity(page_index_predicates.len());
for predicate in page_index_predicates {
// `extract_page_index_push_down_predicates` only return predicate
with one col.
// when building `PruningPredicate`, some single column filter
like `abs(i) = 1`
@@ -163,12 +162,11 @@ pub(crate) fn build_page_filter(
}
}
debug!(
- "Use filter and page index create RowSelection {:?} from
predicate: {:?}",
- &selectors,
- predicate.predicate_expr(),
- );
- row_selections
-
.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
+ "Use filter and page index create RowSelection {:?} from
predicate: {:?}",
+ &selectors,
+ predicate.predicate_expr(),
+ );
+
row_selections.push(selectors.into_iter().flatten().collect::<Vec<_>>());
}
}
let final_selection = combine_multi_col_selection(row_selections);
@@ -184,7 +182,7 @@ pub(crate) fn build_page_filter(
},
);
file_metrics.page_index_rows_filtered.add(total_skip);
- Ok(Some(final_selection.into()))
+ Ok(Some(final_selection))
} else {
Ok(None)
}
@@ -197,93 +195,14 @@ pub(crate) fn build_page_filter(
///
/// The final selection is the intersection of these `RowSelector`s:
/// * `final_selection:[ Skip(0~199), Read(200~249), Skip(250~299)]`
-fn combine_multi_col_selection(
- row_selections: VecDeque<Vec<RowSelector>>,
-) -> Vec<RowSelector> {
+fn combine_multi_col_selection(row_selections: Vec<Vec<RowSelector>>) ->
RowSelection {
row_selections
.into_iter()
- .reduce(intersect_row_selection)
+ .map(RowSelection::from)
+ .reduce(|s1, s2| s1.intersection(&s2))
.unwrap()
}
-/// combine two `RowSelection` return the intersection
-/// For example:
-/// self: NNYYYYNNY
-/// other: NYNNNNNNY
-///
-/// returned: NNNNNNNNY
-/// set `need_combine` true will combine result: Select(2) + Select(1) +
Skip(2) -> Select(3) + Skip(2)
-///
-/// Move to arrow-rs: https://github.com/apache/arrow-rs/issues/3003
-pub(crate) fn intersect_row_selection(
- left: Vec<RowSelector>,
- right: Vec<RowSelector>,
-) -> Vec<RowSelector> {
- let mut res = vec![];
- let mut l_iter = left.into_iter().peekable();
- let mut r_iter = right.into_iter().peekable();
-
- while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
- if a.row_count == 0 {
- l_iter.next().unwrap();
- continue;
- }
- if b.row_count == 0 {
- r_iter.next().unwrap();
- continue;
- }
- match (a.skip, b.skip) {
- // Keep both ranges
- (false, false) => {
- if a.row_count < b.row_count {
- res.push(RowSelector::select(a.row_count));
- b.row_count -= a.row_count;
- l_iter.next().unwrap();
- } else {
- res.push(RowSelector::select(b.row_count));
- a.row_count -= b.row_count;
- r_iter.next().unwrap();
- }
- }
- // skip at least one
- _ => {
- if a.row_count < b.row_count {
- res.push(RowSelector::skip(a.row_count));
- b.row_count -= a.row_count;
- l_iter.next().unwrap();
- } else {
- res.push(RowSelector::skip(b.row_count));
- a.row_count -= b.row_count;
- r_iter.next().unwrap();
- }
- }
- }
- }
- if l_iter.peek().is_some() {
- res.extend(l_iter);
- }
- if r_iter.peek().is_some() {
- res.extend(r_iter);
- }
- // combine the adjacent same operators and last zero row count
- // TODO: remove when https://github.com/apache/arrow-rs/pull/2994 is
released~
-
- let mut pre = res[0];
- let mut after_combine = vec![];
- for selector in res.iter_mut().skip(1) {
- if selector.skip == pre.skip {
- pre.row_count += selector.row_count;
- } else {
- after_combine.push(pre);
- pre = *selector;
- }
- }
- if pre.row_count != 0 {
- after_combine.push(pre);
- }
- after_combine
-}
-
// Extract single col pruningPredicate from input predicate for evaluating
page Index.
fn extract_page_index_push_down_predicates(
predicate: Option<&PruningPredicate>,
@@ -539,67 +458,3 @@ impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
}
}
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_combine_row_selection() {
- // a size equal b size
- let a = vec![
- RowSelector::select(5),
- RowSelector::skip(4),
- RowSelector::select(1),
- ];
- let b = vec![
- RowSelector::select(8),
- RowSelector::skip(1),
- RowSelector::select(1),
- ];
-
- let res = intersect_row_selection(a, b);
- assert_eq!(
- res,
- vec![
- RowSelector::select(5),
- RowSelector::skip(4),
- RowSelector::select(1)
- ],
- );
-
- // a size larger than b size
- let a = vec![
- RowSelector::select(3),
- RowSelector::skip(33),
- RowSelector::select(3),
- RowSelector::skip(33),
- ];
- let b = vec![RowSelector::select(36), RowSelector::skip(36)];
- let res = intersect_row_selection(a, b);
- assert_eq!(res, vec![RowSelector::select(3), RowSelector::skip(69)]);
-
- // a size less than b size
- let a = vec![RowSelector::select(3), RowSelector::skip(7)];
- let b = vec![
- RowSelector::select(2),
- RowSelector::skip(2),
- RowSelector::select(2),
- RowSelector::skip(2),
- RowSelector::select(2),
- ];
- let res = intersect_row_selection(a, b);
- assert_eq!(res, vec![RowSelector::select(2), RowSelector::skip(8)]);
-
- let a = vec![RowSelector::select(3), RowSelector::skip(7)];
- let b = vec![
- RowSelector::select(2),
- RowSelector::skip(2),
- RowSelector::select(2),
- RowSelector::skip(2),
- RowSelector::select(2),
- ];
- let res = intersect_row_selection(a, b);
- assert_eq!(res, vec![RowSelector::select(2), RowSelector::skip(8),]);
- }
-}