This is an automated email from the ASF dual-hosted git repository.

houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 03075d5  Fix null comparison for Parquet pruning predicate (#1595)
03075d5 is described below

commit 03075d5f4b3fdfd8f82144fcd409418832a4bf69
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Thu Jan 20 23:17:50 2022 -0800

    Fix null comparison for Parquet pruning predicate (#1595)
    
    * Fix null comparison for Parquet pruning predicate
    
    * Use IsNull for null count predicate pruning
---
 datafusion/src/physical_optimizer/pruning.rs       | 66 ++++++++++++++-
 .../src/physical_plan/file_format/parquet.rs       | 96 ++++++++++++++++++----
 2 files changed, 143 insertions(+), 19 deletions(-)

diff --git a/datafusion/src/physical_optimizer/pruning.rs 
b/datafusion/src/physical_optimizer/pruning.rs
index 5f167e2..22b854b 100644
--- a/datafusion/src/physical_optimizer/pruning.rs
+++ b/datafusion/src/physical_optimizer/pruning.rs
@@ -37,6 +37,7 @@ use arrow::{
     record_batch::RecordBatch,
 };
 
+use crate::prelude::lit;
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
@@ -75,6 +76,12 @@ pub trait PruningStatistics {
     /// return the number of containers (e.g. row groups) being
     /// pruned with these statistics
     fn num_containers(&self) -> usize;
+
+    /// return the number of null values for the named column as an
+    /// `Option<UInt64Array>`.
+    ///
+    /// Note: the returned array must contain `num_containers()` rows.
+    fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
 }
 
 /// Evaluates filter expressions on statistics in order to
@@ -200,7 +207,7 @@ impl PruningPredicate {
 struct RequiredStatColumns {
     /// The statistics required to evaluate this predicate:
     /// * The unqualified column in the input schema
-    /// * Statistics type (e.g. Min or Max)
+    /// * Statistics type (e.g. Min or Max or Null_Count)
     /// * The field the statistics value should be placed in for
     ///   pruning predicate evaluation
     columns: Vec<(Column, StatisticsType, Field)>,
@@ -281,6 +288,22 @@ impl RequiredStatColumns {
     ) -> Result<Expr> {
         self.stat_column_expr(column, column_expr, field, StatisticsType::Max, 
"max")
     }
+
+    /// rewrite col --> col_null_count
+    fn null_count_column_expr(
+        &mut self,
+        column: &Column,
+        column_expr: &Expr,
+        field: &Field,
+    ) -> Result<Expr> {
+        self.stat_column_expr(
+            column,
+            column_expr,
+            field,
+            StatisticsType::NullCount,
+            "null_count",
+        )
+    }
 }
 
 impl From<Vec<(Column, StatisticsType, Field)>> for RequiredStatColumns {
@@ -329,6 +352,7 @@ fn build_statistics_record_batch<S: PruningStatistics>(
         let array = match statistics_type {
             StatisticsType::Min => statistics.min_values(column),
             StatisticsType::Max => statistics.max_values(column),
+            StatisticsType::NullCount => statistics.null_counts(column),
         };
         let array = array.unwrap_or_else(|| new_null_array(data_type, 
num_containers));
 
@@ -582,6 +606,32 @@ fn build_single_column_expr(
     }
 }
 
+/// Given an expression reference to `expr`, if `expr` is a column expression,
+/// returns a pruning expression in terms of IsNull that will evaluate to true
+/// if the column may contain null, and false if definitely does not
+/// contain null.
+fn build_is_null_column_expr(
+    expr: &Expr,
+    schema: &Schema,
+    required_columns: &mut RequiredStatColumns,
+) -> Option<Expr> {
+    match expr {
+        Expr::Column(ref col) => {
+            let field = schema.field_with_name(&col.name).ok()?;
+
+            let null_count_field = &Field::new(field.name(), DataType::UInt64, 
false);
+            required_columns
+                .null_count_column_expr(col, expr, null_count_field)
+                .map(|null_count_column_expr| {
+                    // IsNull(column) => null_count > 0
+                    null_count_column_expr.gt(lit::<u64>(0))
+                })
+                .ok()
+        }
+        _ => None,
+    }
+}
+
 /// Translate logical filter expression into pruning predicate
 /// expression that will evaluate to FALSE if it can be determined no
 /// rows between the min/max values could pass the predicates.
@@ -602,6 +652,11 @@ fn build_predicate_expression(
     // predicate expression can only be a binary expression
     let (left, op, right) = match expr {
         Expr::BinaryExpr { left, op, right } => (left, *op, right),
+        Expr::IsNull(expr) => {
+            let expr = build_is_null_column_expr(expr, schema, 
required_columns)
+                .unwrap_or(unhandled);
+            return Ok(expr);
+        }
         Expr::Column(col) => {
             let expr = build_single_column_expr(col, schema, required_columns, 
false)
                 .unwrap_or(unhandled);
@@ -702,6 +757,7 @@ fn build_statistics_expr(expr_builder: &mut 
PruningExpressionBuilder) -> Result<
 enum StatisticsType {
     Min,
     Max,
+    NullCount,
 }
 
 #[cfg(test)]
@@ -812,6 +868,10 @@ mod tests {
                 .map(|container_stats| container_stats.len())
                 .unwrap_or(0)
         }
+
+        fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
+            None
+        }
     }
 
     /// Returns the specified min/max container values
@@ -833,6 +893,10 @@ mod tests {
         fn num_containers(&self) -> usize {
             self.num_containers
         }
+
+        fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
+            None
+        }
     }
 
     #[test]
diff --git a/datafusion/src/physical_plan/file_format/parquet.rs 
b/datafusion/src/physical_plan/file_format/parquet.rs
index 17abb43..5768d0a 100644
--- a/datafusion/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/src/physical_plan/file_format/parquet.rs
@@ -341,6 +341,31 @@ macro_rules! get_min_max_values {
     }}
 }
 
+// Extract the null count value on the ParquetStatistics
+macro_rules! get_null_count_values {
+    ($self:expr, $column:expr) => {{
+        let column_index =
+            if let Some((v, _)) = 
$self.parquet_schema.column_with_name(&$column.name) {
+                v
+            } else {
+                // Named column was not present
+                return None;
+            };
+
+        let scalar_values: Vec<ScalarValue> = $self
+            .row_group_metadata
+            .iter()
+            .flat_map(|meta| meta.column(column_index).statistics())
+            .map(|stats| {
+                
ScalarValue::UInt64(Some(stats.null_count().try_into().unwrap()))
+            })
+            .collect();
+
+        // ignore errors converting to arrays (e.g. different types)
+        ScalarValue::iter_to_array(scalar_values).ok()
+    }};
+}
+
 impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
     fn min_values(&self, column: &Column) -> Option<ArrayRef> {
         get_min_max_values!(self, column, min, min_bytes)
@@ -353,6 +378,10 @@ impl<'a> PruningStatistics for 
RowGroupPruningStatistics<'a> {
     fn num_containers(&self) -> usize {
         self.row_group_metadata.len()
     }
+
+    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
+        get_null_count_values!(self, column)
+    }
 }
 
 fn build_row_group_predicate(
@@ -710,21 +739,7 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn row_group_pruning_predicate_null_expr() -> Result<()> {
-        use crate::logical_plan::{col, lit};
-        // test row group predicate with an unknown (Null) expr
-        //
-        // int > 1 and bool = NULL => c1_max > 1 and null
-        let expr = col("c1")
-            .gt(lit(15))
-            .and(col("c2").eq(lit(ScalarValue::Boolean(None))));
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("c1", DataType::Int32, false),
-            Field::new("c2", DataType::Boolean, false),
-        ]));
-        let pruning_predicate = PruningPredicate::try_new(&expr, schema)?;
-
+    fn gen_row_group_meta_data_for_pruning_predicate() -> 
Vec<RowGroupMetaData> {
         let schema_descr = get_test_schema_descr(vec![
             ("c1", PhysicalType::INT32),
             ("c2", PhysicalType::BOOLEAN),
@@ -740,10 +755,56 @@ mod tests {
             &schema_descr,
             vec![
                 ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
-                ParquetStatistics::boolean(Some(false), Some(true), None, 0, 
false),
+                ParquetStatistics::boolean(Some(false), Some(true), None, 1, 
false),
             ],
         );
-        let row_group_metadata = vec![rgm1, rgm2];
+        vec![rgm1, rgm2]
+    }
+
+    #[test]
+    fn row_group_pruning_predicate_null_expr() -> Result<()> {
+        use crate::logical_plan::{col, lit};
+        // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
+        let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Int32, false),
+            Field::new("c2", DataType::Boolean, false),
+        ]));
+        let pruning_predicate = PruningPredicate::try_new(&expr, schema)?;
+        let row_group_metadata = 
gen_row_group_meta_data_for_pruning_predicate();
+
+        let row_group_predicate = build_row_group_predicate(
+            &pruning_predicate,
+            parquet_file_metrics(),
+            &row_group_metadata,
+        );
+        let row_group_filter = row_group_metadata
+            .iter()
+            .enumerate()
+            .map(|(i, g)| row_group_predicate(g, i))
+            .collect::<Vec<_>>();
+        // First row group was filtered out because it contains no null value 
on "c2".
+        assert_eq!(row_group_filter, vec![false, true]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn row_group_pruning_predicate_eq_null_expr() -> Result<()> {
+        use crate::logical_plan::{col, lit};
+        // test row group predicate with an unknown (Null) expr
+        //
+        // int > 1 and bool = NULL => c1_max > 1 and null
+        let expr = col("c1")
+            .gt(lit(15))
+            .and(col("c2").eq(lit(ScalarValue::Boolean(None))));
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Int32, false),
+            Field::new("c2", DataType::Boolean, false),
+        ]));
+        let pruning_predicate = PruningPredicate::try_new(&expr, schema)?;
+        let row_group_metadata = 
gen_row_group_meta_data_for_pruning_predicate();
+
         let row_group_predicate = build_row_group_predicate(
             &pruning_predicate,
             parquet_file_metrics(),
@@ -756,7 +817,6 @@ mod tests {
             .collect::<Vec<_>>();
         // no row group is filtered out because the predicate expression can't 
be evaluated
         // when a null array is generated for a statistics column,
-        // because the null values propagate to the end result, making the 
predicate result undefined
         assert_eq!(row_group_filter, vec![true, true]);
 
         Ok(())

Reply via email to