This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ed374678d1 Prune columns are all null in ParquetExec by row_counts , 
handle IS NOT NULL (#9989)
ed374678d1 is described below

commit ed374678d1beac56d39e673eb0edb78f34458f68
Author: Yang Jiang <[email protected]>
AuthorDate: Thu Apr 11 00:48:49 2024 +0800

    Prune columns are all null in ParquetExec by row_counts , handle IS NOT 
NULL (#9989)
    
    * Prune columns are all null in ParquetExec by row_counts in pruning 
statistics
    
    * fix clippy
    
    * Update datafusion/core/tests/parquet/row_group_pruning.rs
    
    Co-authored-by: Ruihang Xia <[email protected]>
    
    * fix comment and support isNotNUll
    
    * add test
    
    * fix conflict
    
    ---------
    
    Co-authored-by: Ruihang Xia <[email protected]>
---
 .../datasource/physical_plan/parquet/row_groups.rs | 10 ++--
 datafusion/core/src/physical_optimizer/pruning.rs  | 38 +++++++++++---
 datafusion/core/tests/parquet/mod.rs               | 30 +++++++++++
 datafusion/core/tests/parquet/row_group_pruning.rs | 60 ++++++++++++++++++++++
 4 files changed, 128 insertions(+), 10 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 6600dd07d7..2b96659548 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -338,8 +338,10 @@ impl<'a> PruningStatistics for 
RowGroupPruningStatistics<'a> {
         scalar.to_array().ok()
     }
 
-    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
-        None
+    fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
+        let (c, _) = self.column(&column.name)?;
+        let scalar = ScalarValue::UInt64(Some(c.num_values() as u64));
+        scalar.to_array().ok()
     }
 
     fn contained(
@@ -1022,15 +1024,17 @@ mod tests {
         column_statistics: Vec<ParquetStatistics>,
     ) -> RowGroupMetaData {
         let mut columns = vec![];
+        let number_row = 1000;
         for (i, s) in column_statistics.iter().enumerate() {
             let column = ColumnChunkMetaData::builder(schema_descr.column(i))
                 .set_statistics(s.clone())
+                .set_num_values(number_row)
                 .build()
                 .unwrap();
             columns.push(column);
         }
         RowGroupMetaData::builder(schema_descr.clone())
-            .set_num_rows(1000)
+            .set_num_rows(number_row)
             .set_total_byte_size(2000)
             .set_column_metadata(columns)
             .build()
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs 
b/datafusion/core/src/physical_optimizer/pruning.rs
index dc7e0529de..ebb811408f 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -335,6 +335,7 @@ pub trait PruningStatistics {
 /// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 
END`
 /// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE 
x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN 
false ELSE y_min <= 10 AND 10 <= y_max END`
 /// `x IS NULL`  | `x_null_count > 0`
+/// `x IS NOT NULL`  | `x_null_count = 0`
 /// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false 
ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END`
 ///
 /// ## Predicate Evaluation
@@ -1239,10 +1240,15 @@ fn build_single_column_expr(
 /// returns a pruning expression in terms of IsNull that will evaluate to true
 /// if the column may contain null, and false if definitely does not
 /// contain null.
+/// If set `with_not` to true: which means is not null
+/// Given an expression reference to `expr`, if `expr` is a column expression,
+/// returns a pruning expression in terms of IsNotNull that will evaluate to 
true
+/// if the column not contain any null, and false if definitely contain null.
 fn build_is_null_column_expr(
     expr: &Arc<dyn PhysicalExpr>,
     schema: &Schema,
     required_columns: &mut RequiredColumns,
+    with_not: bool,
 ) -> Option<Arc<dyn PhysicalExpr>> {
     if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
         let field = schema.field_with_name(col.name()).ok()?;
@@ -1251,12 +1257,21 @@ fn build_is_null_column_expr(
         required_columns
             .null_count_column_expr(col, expr, null_count_field)
             .map(|null_count_column_expr| {
-                // IsNull(column) => null_count > 0
-                Arc::new(phys_expr::BinaryExpr::new(
-                    null_count_column_expr,
-                    Operator::Gt,
-                    
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
-                )) as _
+                if with_not {
+                    // IsNotNull(column) => null_count = 0
+                    Arc::new(phys_expr::BinaryExpr::new(
+                        null_count_column_expr,
+                        Operator::Eq,
+                        
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
+                    )) as _
+                } else {
+                    // IsNull(column) => null_count > 0
+                    Arc::new(phys_expr::BinaryExpr::new(
+                        null_count_column_expr,
+                        Operator::Gt,
+                        
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
+                    )) as _
+                }
             })
             .ok()
     } else {
@@ -1287,9 +1302,18 @@ fn build_predicate_expression(
     // predicate expression can only be a binary expression
     let expr_any = expr.as_any();
     if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
-        return build_is_null_column_expr(is_null.arg(), schema, 
required_columns)
+        return build_is_null_column_expr(is_null.arg(), schema, 
required_columns, false)
             .unwrap_or(unhandled);
     }
+    if let Some(is_not_null) = 
expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
+        return build_is_null_column_expr(
+            is_not_null.arg(),
+            schema,
+            required_columns,
+            true,
+        )
+        .unwrap_or(unhandled);
+    }
     if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
         return build_single_column_expr(col, schema, required_columns, false)
             .unwrap_or(unhandled);
diff --git a/datafusion/core/tests/parquet/mod.rs 
b/datafusion/core/tests/parquet/mod.rs
index b4415d638a..f36afe1976 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -28,6 +28,7 @@ use arrow::{
     record_batch::RecordBatch,
     util::pretty::pretty_format_batches,
 };
+use arrow_array::new_null_array;
 use chrono::{Datelike, Duration, TimeDelta};
 use datafusion::{
     datasource::{physical_plan::ParquetExec, provider_as_source, 
TableProvider},
@@ -75,6 +76,7 @@ enum Scenario {
     DecimalLargePrecisionBloomFilter,
     ByteArray,
     PeriodsInColumnNames,
+    WithNullValues,
 }
 
 enum Unit {
@@ -630,6 +632,27 @@ fn make_names_batch(name: &str, service_name_values: 
Vec<&str>) -> RecordBatch {
     RecordBatch::try_new(schema, vec![Arc::new(name), 
Arc::new(service_name)]).unwrap()
 }
 
+/// Return record batch with i8, i16, i32, and i64 sequences with all Null 
values
+fn make_all_null_values() -> RecordBatch {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("i8", DataType::Int8, true),
+        Field::new("i16", DataType::Int16, true),
+        Field::new("i32", DataType::Int32, true),
+        Field::new("i64", DataType::Int64, true),
+    ]));
+
+    RecordBatch::try_new(
+        schema,
+        vec![
+            new_null_array(&DataType::Int8, 5),
+            new_null_array(&DataType::Int16, 5),
+            new_null_array(&DataType::Int32, 5),
+            new_null_array(&DataType::Int64, 5),
+        ],
+    )
+    .unwrap()
+}
+
 fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
     match scenario {
         Scenario::Timestamps => {
@@ -799,6 +822,13 @@ fn create_data_batch(scenario: Scenario) -> 
Vec<RecordBatch> {
                 ),
             ]
         }
+        Scenario::WithNullValues => {
+            vec![
+                make_all_null_values(),
+                make_int_batches(1, 6),
+                make_all_null_values(),
+            ]
+        }
     }
 }
 
diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs 
b/datafusion/core/tests/parquet/row_group_pruning.rs
index 8fc7936552..29bf1ef0a8 100644
--- a/datafusion/core/tests/parquet/row_group_pruning.rs
+++ b/datafusion/core/tests/parquet/row_group_pruning.rs
@@ -1262,3 +1262,63 @@ async fn prune_periods_in_column_names() {
         .test_row_group_prune()
         .await;
 }
+
+#[tokio::test]
+async fn test_row_group_with_null_values() {
+    // Three row groups:
+    // 1. all Null values
+    // 2. values from 1 to 5
+    // 3. all Null values
+
+    // After pruning, only row group 2 should be selected
+    RowGroupPruningTest::new()
+        .with_scenario(Scenario::WithNullValues)
+        .with_query("SELECT * FROM t WHERE \"i8\" <= 5")
+        .with_expected_errors(Some(0))
+        .with_matched_by_stats(Some(1))
+        .with_pruned_by_stats(Some(2))
+        .with_expected_rows(5)
+        .with_matched_by_bloom_filter(Some(0))
+        .with_pruned_by_bloom_filter(Some(0))
+        .test_row_group_prune()
+        .await;
+
+    // After pruning, only row group 1,3 should be selected
+    RowGroupPruningTest::new()
+        .with_scenario(Scenario::WithNullValues)
+        .with_query("SELECT * FROM t WHERE \"i8\" is Null")
+        .with_expected_errors(Some(0))
+        .with_matched_by_stats(Some(2))
+        .with_pruned_by_stats(Some(1))
+        .with_expected_rows(10)
+        .with_matched_by_bloom_filter(Some(0))
+        .with_pruned_by_bloom_filter(Some(0))
+        .test_row_group_prune()
+        .await;
+
+    // After pruning, only row group 2should be selected
+    RowGroupPruningTest::new()
+        .with_scenario(Scenario::WithNullValues)
+        .with_query("SELECT * FROM t WHERE \"i16\" is Not Null")
+        .with_expected_errors(Some(0))
+        .with_matched_by_stats(Some(1))
+        .with_pruned_by_stats(Some(2))
+        .with_expected_rows(5)
+        .with_matched_by_bloom_filter(Some(0))
+        .with_pruned_by_bloom_filter(Some(0))
+        .test_row_group_prune()
+        .await;
+
+    // All row groups will be pruned
+    RowGroupPruningTest::new()
+        .with_scenario(Scenario::WithNullValues)
+        .with_query("SELECT * FROM t WHERE \"i32\" > 7")
+        .with_expected_errors(Some(0))
+        .with_matched_by_stats(Some(0))
+        .with_pruned_by_stats(Some(3))
+        .with_expected_rows(0)
+        .with_matched_by_bloom_filter(Some(0))
+        .with_pruned_by_bloom_filter(Some(0))
+        .test_row_group_prune()
+        .await;
+}

Reply via email to