yordan-pavlov commented on a change in pull request #9064:
URL: https://github.com/apache/arrow/pull/9064#discussion_r551587504



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -209,6 +251,477 @@ impl ParquetExec {
     }
 }
 
+#[derive(Debug, Clone)]
+/// Predicate builder used for generating of predicate functions, used to 
filter row group metadata
+pub struct PredicateExpressionBuilder {
+    parquet_schema: Schema,
+    predicate_expr: Arc<dyn PhysicalExpr>,
+    stat_column_req: Vec<(String, StatisticsType, Field)>,
+}
+
+impl PredicateExpressionBuilder {
+    /// Try to create a new instance of PredicateExpressionBuilder
+    pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result<Self> {
+        // build predicate expression once
+        let mut stat_column_req = Vec::<(String, StatisticsType, 
Field)>::new();
+        let predicate_expr =
+            build_predicate_expression(expr, &parquet_schema, &mut 
stat_column_req)?;
+
+        Ok(Self {
+            parquet_schema,
+            predicate_expr,
+            stat_column_req,
+        })
+    }
+
+    /// Generate a predicate function used to filter row group metadata
+    pub fn build_row_group_predicate(
+        &self,
+        row_group_metadata: &[RowGroupMetaData],
+    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+        // build statistics record batch
+        let predicate_result = build_row_group_record_batch(
+            row_group_metadata,
+            &self.parquet_schema,
+            &self.stat_column_req,
+        )
+        .and_then(|statistics_batch| {
+            // execute predicate expression
+            self.predicate_expr.evaluate(&statistics_batch)
+        })
+        .and_then(|v| match v {
+            ColumnarValue::Array(array) => Ok(array),
+            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
+                "predicate expression didn't return an array".to_string(),
+            )),
+        });
+
+        let predicate_array = match predicate_result {
+            Ok(array) => array,
+            _ => return Box::new(|_r, _i| true),
+        };
+
+        let predicate_array = 
predicate_array.as_any().downcast_ref::<BooleanArray>();
+        match predicate_array {
+            // return row group predicate function
+            Some(array) => {
+                let predicate_values =
+                    array.iter().map(|x| 
x.unwrap_or(false)).collect::<Vec<_>>();
+                Box::new(move |_, i| predicate_values[i])
+            }
+            // predicate result is not a BooleanArray
+            _ => Box::new(|_r, _i| true),

Review comment:
       My thinking in designing this has been that pushing the predicate down 
to parquet is optional, because even if it fails the query will still compute, 
just slower; because of that the code tries to avoid panicking and instead 
returns a predicate which returns true - it doesn't filter any row groups and 
lets them be processed by downstream operators. 
   It is even possible to have a partial predicate expression, where multiple 
conditions are joined using a logical `AND`, and some of them can't be 
translated for some reason to physical expressions, they will be replaced by 
`true`, but the rest will still be evaluated and could still filter some row 
groups.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to