alamb commented on a change in pull request #380:
URL: https://github.com/apache/arrow-datafusion/pull/380#discussion_r637952920



##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -85,94 +115,132 @@ impl PruningPredicateBuilder {
         })
     }
 
-    /// Generate a predicate function used to filter based on
-    /// statistics
+    /// For each set of statistics, evalates the pruning predicate
+    /// and returns a `bool` with the following meaning for a
+    /// all rows whose values match the statistics:
     ///
-    /// This function takes a slice of statistics as parameter, so
-    /// that DataFusion's physical expressions can be executed once
-    /// against a single RecordBatch, containing statistics arrays, on
-    /// which the physical predicate expression is executed to
-    /// generate a row group filter array.
+    /// `true`: There MAY be rows that match the predicate
     ///
-    /// The generated filter function is then used in the returned
-    /// closure to filter row groups. NOTE this is parquet specific at the 
moment
-    pub fn build_pruning_predicate(
-        &self,
-        row_group_metadata: &[RowGroupMetaData],
-    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+    /// `false`: There are no rows that could match the predicate
+    ///
+    /// Note this function takes a slice of statistics as a parameter
+    /// to amortize the cost of the evaluation of the predicate
+    /// against a single record batch.
+    pub fn prune<S: PruningStatistics>(&self, statistics: &[S]) -> 
Result<Vec<bool>> {
         // build statistics record batch
-        let predicate_result = build_statistics_record_batch(
-            row_group_metadata,
-            &self.schema,
-            &self.stat_column_req,
-        )
-        .and_then(|statistics_batch| {
-            // execute predicate expression
-            self.predicate_expr.evaluate(&statistics_batch)
-        })
-        .and_then(|v| match v {
-            ColumnarValue::Array(array) => Ok(array),
-            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
-                "predicate expression didn't return an array".to_string(),
-            )),
-        });
-
-        let predicate_array = match predicate_result {
-            Ok(array) => array,
-            // row group filter array could not be built
-            // return a closure which will not filter out any row groups
-            _ => return Box::new(|_r, _i| true),
-        };
+        let predicate_array =
+            build_statistics_record_batch(statistics, &self.stat_column_req)
+                .and_then(|statistics_batch| {
+                    // execute predicate expression
+                    self.predicate_expr.evaluate(&statistics_batch)
+                })
+                .and_then(|v| match v {
+                    ColumnarValue::Array(array) => Ok(array),
+                    ColumnarValue::Scalar(_) => Err(DataFusionError::Internal(
+                        "predicate expression didn't return an 
array".to_string(),
+                    )),
+                })?;
+
+        let predicate_array = predicate_array
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "Expected pruning predicate evaluation to be BooleanArray, 
\
+                     but was {:?}",
+                    predicate_array
+                ))
+            })?;
+
+        // when the result of the predicate expression for a row group is null 
/ undefined,
+        // e.g. due to missing statistics, this row group can't be filtered 
out,
+        // so replace with true
+        Ok(predicate_array
+            .into_iter()
+            .map(|x| x.unwrap_or(true))
+            .collect::<Vec<_>>())
+    }
 
-        let predicate_array = 
predicate_array.as_any().downcast_ref::<BooleanArray>();
-        match predicate_array {
-            // return row group predicate function
-            Some(array) => {
-                // when the result of the predicate expression for a row group 
is null / undefined,
-                // e.g. due to missing statistics, this row group can't be 
filtered out,
-                // so replace with true
-                let predicate_values =
-                    array.iter().map(|x| 
x.unwrap_or(true)).collect::<Vec<_>>();
-                Box::new(move |_, i| predicate_values[i])
-            }
-            // predicate result is not a BooleanArray
-            // return a closure which will not filter out any row groups
-            _ => Box::new(|_r, _i| true),
-        }
+    /// Return a reference to the input schema
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
     }
 }
 
-/// Build a RecordBatch from a list of statistics (currently parquet
-/// [`RowGroupMetadata`] structs), creating arrays, one for each
-/// statistics column, as requested in the stat_column_req parameter.
-fn build_statistics_record_batch(
-    row_groups: &[RowGroupMetaData],
-    schema: &Schema,
+/// Build a RecordBatch from a list of statistics, creating arrays,
+/// with one row for each PruningStatistics and columns specified in
+/// in the stat_column_req parameter.
+///
+/// For example, if the requested columns are
+/// ```text
+/// ("s1", Min, Field:s1_min)
+/// ("s2", Max, field:s2_maxx)
+///```
+///
+/// And the input statistics had
+/// ```text
+/// S1(Min: 5, Max: 10)
+/// S2(Min: 99, Max: 1000)
+/// S3(Min: 1, Max: 2)
+/// ```
+///
+/// Then this function would build a record batch with 2 columns and
+/// one row s1_min and s2_maxx as follows (s3 is not requested):
+///
+/// ```text
+/// s1_min | s2_maxx
+/// -------+--------
+///   5    | 10

Review comment:
       oh, yes -- that is a great catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to