alamb commented on code in PR #11483:
URL: https://github.com/apache/datafusion/pull/11483#discussion_r1679347823


##########
datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs:
##########
@@ -103,30 +96,52 @@ use super::metrics::ParquetFileMetrics;
 ///
 /// So we can entirely skip rows 0->199 and 250->299 as we know they
 /// can not contain rows that match the predicate.
+///
+/// # Implementation notes
+///
+/// Single column predicates are evaluated using the PageIndex information
+/// for that column to determine which row ranges can be skipped based.
+///
+/// The resulting [`RowSelection`]'s are combined into a final
+/// row selection that is added to the [`ParquetAccessPlan`].
 #[derive(Debug)]
-pub struct PagePruningPredicate {
+pub struct PagePruningAccessPlanFilter {
+    /// single column predicates (e.g. (`col = 5`) extracted from the overall
+    /// predicate. Must all be true for a row to be included in the result.
     predicates: Vec<PruningPredicate>,
 }
 
-impl PagePruningPredicate {
-    /// Create a new [`PagePruningPredicate`]
-    // TODO: this is infallaible -- it can not return an error
-    pub fn try_new(expr: &Arc<dyn PhysicalExpr>, schema: SchemaRef) -> 
Result<Self> {
+impl PagePruningAccessPlanFilter {
+    /// Create a new [`PagePruningAccessPlanFilter`] from a physical
+    /// expression.
+    pub fn new(expr: &Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Self {
+        // extract any single column predicates
         let predicates = split_conjunction(expr)
             .into_iter()
             .filter_map(|predicate| {
-                match PruningPredicate::try_new(predicate.clone(), 
schema.clone()) {
-                    Ok(p)
-                        if (!p.always_true())
-                            && (p.required_columns().n_columns() < 2) =>
-                    {
-                        Some(Ok(p))
-                    }
-                    _ => None,
+                let pp =

Review Comment:
   I also added a more logging for the cases when predicates can't be used for 
pruning



##########
datafusion/core/src/physical_optimizer/pruning.rs:
##########
@@ -736,12 +738,22 @@ impl RequiredColumns {
         Self::default()
     }
 
-    /// Returns number of unique columns
-    pub(crate) fn n_columns(&self) -> usize {
-        self.iter()
-            .map(|(c, _s, _f)| c)
-            .collect::<HashSet<_>>()
-            .len()
+    /// Returns Some(column) if this is a single column predicate.

Review Comment:
   this was an easier API to work with



##########
datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs:
##########
@@ -378,206 +354,143 @@ fn prune_pages_in_one_row_group(
     Some(RowSelection::from(vec))
 }
 
-fn create_row_count_in_each_page(
-    location: &[PageLocation],
-    num_rows: usize,
-) -> Vec<usize> {
-    let mut vec = Vec::with_capacity(location.len());
-    location.windows(2).for_each(|x| {
-        let start = x[0].first_row_index as usize;
-        let end = x[1].first_row_index as usize;
-        vec.push(end - start);
-    });
-    vec.push(num_rows - location.last().unwrap().first_row_index as usize);
-    vec
-}
-
-/// Wraps one col page_index in one rowGroup statistics in a way
-/// that implements [`PruningStatistics`]
+/// Implement [`PruningStatistics`] for one column's PageIndex (column_index + 
offset_index)
 #[derive(Debug)]
 struct PagesPruningStatistics<'a> {
-    col_page_indexes: &'a Index,
-    col_offset_indexes: &'a Vec<PageLocation>,
-    // target_type means the logical type in schema: like 'DECIMAL' is the 
logical type, but the
-    // real physical type in parquet file may be `INT32, INT64, 
FIXED_LEN_BYTE_ARRAY`
-    target_type: &'a Option<DataType>,
-    num_rows_in_row_group: i64,
+    row_group_index: usize,
+    row_group_metadatas: &'a [RowGroupMetaData],
+    converter: StatisticsConverter<'a>,
+    column_index: &'a ParquetColumnIndex,
+    offset_index: &'a ParquetOffsetIndex,
+    page_offsets: &'a Vec<PageLocation>,
 }
 
-// Extract the min or max value calling `func` from page idex
-macro_rules! get_min_max_values_for_page_index {

Review Comment:
   This code is replaced by `StatisticsConverter` which we have now tested 
quite thoroughly (kudos to @marvinlanhenke and others)



##########
datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs:
##########
@@ -146,52 +161,58 @@ impl PagePruningPredicate {
         }
 
         let page_index_predicates = &self.predicates;
-        let groups = file_metadata.row_groups();
+        let groups = parquet_metadata.row_groups();
 
         if groups.is_empty() {
             return access_plan;
         }
 
-        let (Some(file_offset_indexes), Some(file_page_indexes)) =
-            (file_metadata.offset_index(), file_metadata.column_index())
-        else {
-            trace!(
-                    "skip page pruning due to lack of indexes. Have offset: 
{}, column index: {}",
-                    file_metadata.offset_index().is_some(), 
file_metadata.column_index().is_some()
+        if parquet_metadata.offset_index().is_none()
+            || parquet_metadata.column_index().is_none()
+        {
+            debug!(
+                    "Can not prune pages due to lack of indexes. Have offset: 
{}, column index: {}",
+                    parquet_metadata.offset_index().is_some(), 
parquet_metadata.column_index().is_some()
                 );
             return access_plan;
         };
 
         // track the total number of rows that should be skipped
         let mut total_skip = 0;
 
+        // for each row group specified in the access plan
         let row_group_indexes = access_plan.row_group_indexes();
-        for r in row_group_indexes {
+        for row_group_index in row_group_indexes {

Review Comment:
   I think this now reads easier -- more of the index manipulation is captured 
in `PagesPruningStatistics`



##########
datafusion/core/src/datasource/physical_plan/parquet/mod.rs:
##########
@@ -749,26 +740,6 @@ fn should_enable_page_index(
             .unwrap_or(false)
 }
 
-// Convert parquet column schema to arrow data type, and just consider the

Review Comment:
   This is now handled entirely in the StatisticsConverter



##########
datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs:
##########
@@ -378,206 +354,143 @@ fn prune_pages_in_one_row_group(
     Some(RowSelection::from(vec))
 }
 
-fn create_row_count_in_each_page(

Review Comment:
   Moved into a function on PagesPruningStatistics



##########
datafusion/core/src/datasource/physical_plan/parquet/mod.rs:
##########
@@ -225,7 +223,7 @@ pub struct ParquetExec {
     /// Optional predicate for pruning row groups (derived from `predicate`)
     pruning_predicate: Option<Arc<PruningPredicate>>,
     /// Optional predicate for pruning pages (derived from `predicate`)
-    page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
+    page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,

Review Comment:
   I renamed this to be consistent with what this is -- it isn't a pruning 
predicate per se 



##########
datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs:
##########
@@ -266,105 +287,60 @@ fn update_selection(
     }
 }
 
-/// Returns the column index in the row parquet schema for the single
-/// column of a single column pruning predicate.
-///
-/// For example, give the predicate `y > 5`
+/// Returns a [`RowSelection`] for the rows in this row group to scan.
 ///
-/// And columns in the RowGroupMetadata like `['x', 'y', 'z']` will
-/// return 1.
+/// This Row Selection is formed from the page index and the predicate skips 
row
+/// ranges that can be ruled out based on the predicate.
 ///
-/// Returns `None` if the column is not found, or if there are no
-/// required columns, which is the case for predicate like `abs(i) =
-/// 1` which are rewritten to `lit(true)`
-///
-/// Panics:
-///
-/// If the predicate contains more than one column reference (assumes
-/// that `extract_page_index_push_down_predicates` only returns
-/// predicate with one col)
-fn find_column_index(
-    predicate: &PruningPredicate,
-    arrow_schema: &Schema,
-    parquet_schema: &SchemaDescriptor,
-) -> Option<usize> {
-    let mut found_required_column: Option<&Column> = None;
-
-    for required_column_details in predicate.required_columns().iter() {
-        let column = &required_column_details.0;
-        if let Some(found_required_column) = found_required_column.as_ref() {
-            // make sure it is the same name we have seen previously
-            assert_eq!(
-                column.name(),
-                found_required_column.name(),
-                "Unexpected multi column predicate"
-            );
-        } else {
-            found_required_column = Some(column);
-        }
-    }
-
-    let Some(column) = found_required_column.as_ref() else {
-        trace!("No column references in pruning predicate");
-        return None;
-    };
-
-    parquet_column(parquet_schema, arrow_schema, column.name()).map(|x| x.0)
-}
-
-/// Returns a `RowSelection` for the pages in this RowGroup if any
-/// rows can be pruned based on the page index
+/// Returns `None` if there is an error evaluating the predicate or the 
required
+/// page information is not present.
 fn prune_pages_in_one_row_group(
-    group: &RowGroupMetaData,
-    predicate: &PruningPredicate,
-    col_offset_indexes: Option<&Vec<PageLocation>>,
-    col_page_indexes: Option<&Index>,
-    col_desc: &ColumnDescriptor,
+    row_group_index: usize,
+    pruning_predicate: &PruningPredicate,
+    converter: StatisticsConverter<'_>,
+    parquet_metadata: &ParquetMetaData,
     metrics: &ParquetFileMetrics,
 ) -> Option<RowSelection> {
-    let num_rows = group.num_rows() as usize;
-    let (Some(col_offset_indexes), Some(col_page_indexes)) =
-        (col_offset_indexes, col_page_indexes)
-    else {
-        return None;
-    };
-
-    let target_type = parquet_to_arrow_decimal_type(col_desc);
-    let pruning_stats = PagesPruningStatistics {
-        col_page_indexes,
-        col_offset_indexes,
-        target_type: &target_type,
-        num_rows_in_row_group: group.num_rows(),
-    };
+    let pruning_stats =
+        PagesPruningStatistics::try_new(row_group_index, converter, 
parquet_metadata)?;
 
-    let values = match predicate.prune(&pruning_stats) {
+    // Each element in values is a boolean indicating whether the page may have
+    // values that match the predicate (true) or could not possibly have values
+    // that match the predicate (false).
+    let values = match pruning_predicate.prune(&pruning_stats) {
         Ok(values) => values,
         Err(e) => {
-            // stats filter array could not be built
-            // return a result which will not filter out any pages
             debug!("Error evaluating page index predicate values {e}");
             metrics.predicate_evaluation_errors.add(1);
             return None;
         }
     };
 
+    // Convert the information of which pages to skip into a RowSelection
+    // that describes the ranges of rows to skip.
+    let Some(page_row_counts) = pruning_stats.page_row_counts() else {

Review Comment:
   I renamed `row_vec` to `page_row_counts` to make the logic clearer, and also 
added logging when it wasn't possible to construct



##########
datafusion/core/src/datasource/physical_plan/parquet/statistics.rs:
##########
@@ -1136,6 +1136,16 @@ pub struct StatisticsConverter<'a> {
 }
 
 impl<'a> StatisticsConverter<'a> {
+    /// Return the index of the column in the parquet file, if any

Review Comment:
   These are two new APIs I found I needed to add to the statistics converter 
API that is being ported upstream from @efredine in 
https://github.com/apache/arrow-rs/pull/6046 (I'll do so later today)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to