alamb commented on code in PR #3967:
URL: https://github.com/apache/arrow-datafusion/pull/3967#discussion_r1009390770
##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -1021,72 +1120,57 @@ fn prune_row_groups(
fn prune_pages_in_one_row_group(
group: &RowGroupMetaData,
- predicate: Option<PruningPredicate>,
- offset_indexes: Option<&Vec<Vec<PageLocation>>>,
- page_indexes: Option<&Vec<Index>>,
+ predicate: &PruningPredicate,
+ col_offset_indexes: Option<&Vec<PageLocation>>,
+ col_page_indexes: Option<&Index>,
metrics: &ParquetFileMetrics,
) -> Result<Vec<RowSelector>> {
let num_rows = group.num_rows() as usize;
- if let (Some(predicate), Some(offset_indexes), Some(page_indexes)) =
- (&predicate, offset_indexes, page_indexes)
+ if let (Some(col_offset_indexes), Some(col_page_indexes)) =
+ (col_offset_indexes, col_page_indexes)
{
let pruning_stats = PagesPruningStatistics {
- page_indexes,
- offset_indexes,
- parquet_schema: predicate.schema().as_ref(),
- // now we assume only support one col.
- col_id: *predicate
- .need_input_columns_ids()
- .iter()
- .take(1)
- .next()
- .unwrap(),
+ col_page_indexes,
+ col_offset_indexes,
};
match predicate.prune(&pruning_stats) {
Ok(values) => {
let mut vec = Vec::with_capacity(values.len());
- if let Some(cols_offset_indexes) =
- offset_indexes.get(pruning_stats.col_id)
- {
- let row_vec =
- create_row_count_in_each_page(cols_offset_indexes,
num_rows);
- assert_eq!(row_vec.len(), values.len());
- let mut sum_row = *row_vec.first().unwrap();
- let mut selected = *values.first().unwrap();
-
- for (i, &f) in values.iter().skip(1).enumerate() {
- if f == selected {
- sum_row += *row_vec.get(i).unwrap();
+ let row_vec =
create_row_count_in_each_page(col_offset_indexes, num_rows);
+ assert_eq!(row_vec.len(), values.len());
+ let mut sum_row = *row_vec.first().unwrap();
+ let mut selected = *values.first().unwrap();
+
+ for (i, &f) in values.iter().skip(1).enumerate() {
+ if f == selected {
+ sum_row += *row_vec.get(i).unwrap();
+ } else {
+ let selector = if selected {
+ RowSelector::select(sum_row)
} else {
- let selector = if selected {
- RowSelector::select(sum_row)
- } else {
- RowSelector::skip(sum_row)
- };
- vec.push(selector);
- sum_row = *row_vec.get(i).unwrap();
- selected = f;
- }
+ RowSelector::skip(sum_row)
+ };
+ vec.push(selector);
+ sum_row = *row_vec.get(i).unwrap();
+ selected = f;
}
+ }
- let selector = if selected {
- RowSelector::select(sum_row)
- } else {
- RowSelector::skip(sum_row)
- };
- vec.push(selector);
- return Ok(vec);
+ let selector = if selected {
+ RowSelector::select(sum_row)
} else {
- debug!("Error evaluating page index predicate values
missing page index col_id is{}", pruning_stats.col_id);
- metrics.predicate_evaluation_errors.add(1);
- }
+ RowSelector::skip(sum_row)
+ };
+ vec.push(selector);
+ return Ok(vec);
}
// stats filter array could not be built
- // return a closure which will not filter out any row groups
+ // return a result which will not filter out any pages
Err(e) => {
- debug!("Error evaluating page index predicate values {}", e);
+ error!("Error evaluating page index predicate values {}", e);
metrics.predicate_evaluation_errors.add(1);
+ return Ok(vec![RowSelector::select(group.num_rows() as
usize)]);
Review Comment:
cool! I hope to have https://github.com/apache/arrow-datafusion/pull/3976
ready for review later today -- with that additional testing coverage I will
feel pretty good about this particular optimization 🎉
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]