alamb commented on code in PR #3967:
URL: https://github.com/apache/arrow-datafusion/pull/3967#discussion_r1010674223
##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -1021,72 +1120,57 @@ fn prune_row_groups(
fn prune_pages_in_one_row_group(
group: &RowGroupMetaData,
- predicate: Option<PruningPredicate>,
- offset_indexes: Option<&Vec<Vec<PageLocation>>>,
- page_indexes: Option<&Vec<Index>>,
+ predicate: &PruningPredicate,
+ col_offset_indexes: Option<&Vec<PageLocation>>,
+ col_page_indexes: Option<&Index>,
metrics: &ParquetFileMetrics,
) -> Result<Vec<RowSelector>> {
let num_rows = group.num_rows() as usize;
- if let (Some(predicate), Some(offset_indexes), Some(page_indexes)) =
- (&predicate, offset_indexes, page_indexes)
+ if let (Some(col_offset_indexes), Some(col_page_indexes)) =
+ (col_offset_indexes, col_page_indexes)
{
let pruning_stats = PagesPruningStatistics {
- page_indexes,
- offset_indexes,
- parquet_schema: predicate.schema().as_ref(),
- // now we assume only support one col.
- col_id: *predicate
- .need_input_columns_ids()
- .iter()
- .take(1)
- .next()
- .unwrap(),
+ col_page_indexes,
+ col_offset_indexes,
};
match predicate.prune(&pruning_stats) {
Ok(values) => {
let mut vec = Vec::with_capacity(values.len());
- if let Some(cols_offset_indexes) =
- offset_indexes.get(pruning_stats.col_id)
- {
- let row_vec =
- create_row_count_in_each_page(cols_offset_indexes,
num_rows);
- assert_eq!(row_vec.len(), values.len());
- let mut sum_row = *row_vec.first().unwrap();
- let mut selected = *values.first().unwrap();
-
- for (i, &f) in values.iter().skip(1).enumerate() {
- if f == selected {
- sum_row += *row_vec.get(i).unwrap();
+ let row_vec =
create_row_count_in_each_page(col_offset_indexes, num_rows);
+ assert_eq!(row_vec.len(), values.len());
+ let mut sum_row = *row_vec.first().unwrap();
+ let mut selected = *values.first().unwrap();
+
+ for (i, &f) in values.iter().skip(1).enumerate() {
+ if f == selected {
+ sum_row += *row_vec.get(i).unwrap();
+ } else {
+ let selector = if selected {
+ RowSelector::select(sum_row)
} else {
- let selector = if selected {
- RowSelector::select(sum_row)
- } else {
- RowSelector::skip(sum_row)
- };
- vec.push(selector);
- sum_row = *row_vec.get(i).unwrap();
- selected = f;
- }
+ RowSelector::skip(sum_row)
+ };
+ vec.push(selector);
+ sum_row = *row_vec.get(i).unwrap();
+ selected = f;
}
+ }
- let selector = if selected {
- RowSelector::select(sum_row)
- } else {
- RowSelector::skip(sum_row)
- };
- vec.push(selector);
- return Ok(vec);
+ let selector = if selected {
+ RowSelector::select(sum_row)
} else {
- debug!("Error evaluating page index predicate values
missing page index col_id is{}", pruning_stats.col_id);
- metrics.predicate_evaluation_errors.add(1);
- }
+ RowSelector::skip(sum_row)
+ };
+ vec.push(selector);
+ return Ok(vec);
}
// stats filter array could not be built
- // return a closure which will not filter out any row groups
+ // return a result which will not filter out any pages
Err(e) => {
- debug!("Error evaluating page index predicate values {}", e);
+ error!("Error evaluating page index predicate values {}", e);
metrics.predicate_evaluation_errors.add(1);
+ return Ok(vec![RowSelector::select(group.num_rows() as
usize)]);
Review Comment:
I verified that this PR fixes #4002 -- I have also prepared a PR to enable
page filtering in the integration test
https://github.com/apache/arrow-datafusion/pull/4062 which we can run when this
PR is merged
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]