This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ed374678d1 Prune columns are all null in ParquetExec by row_counts ,
handle IS NOT NULL (#9989)
ed374678d1 is described below
commit ed374678d1beac56d39e673eb0edb78f34458f68
Author: Yang Jiang <[email protected]>
AuthorDate: Thu Apr 11 00:48:49 2024 +0800
Prune columns are all null in ParquetExec by row_counts , handle IS NOT
NULL (#9989)
* Prune columns are all null in ParquetExec by row_counts in pruning
statistics
* fix clippy
* Update datafusion/core/tests/parquet/row_group_pruning.rs
Co-authored-by: Ruihang Xia <[email protected]>
* fix comment and support isNotNUll
* add test
* fix conflict
---------
Co-authored-by: Ruihang Xia <[email protected]>
---
.../datasource/physical_plan/parquet/row_groups.rs | 10 ++--
datafusion/core/src/physical_optimizer/pruning.rs | 38 +++++++++++---
datafusion/core/tests/parquet/mod.rs | 30 +++++++++++
datafusion/core/tests/parquet/row_group_pruning.rs | 60 ++++++++++++++++++++++
4 files changed, 128 insertions(+), 10 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 6600dd07d7..2b96659548 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -338,8 +338,10 @@ impl<'a> PruningStatistics for
RowGroupPruningStatistics<'a> {
scalar.to_array().ok()
}
- fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
- None
+ fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
+ let (c, _) = self.column(&column.name)?;
+ let scalar = ScalarValue::UInt64(Some(c.num_values() as u64));
+ scalar.to_array().ok()
}
fn contained(
@@ -1022,15 +1024,17 @@ mod tests {
column_statistics: Vec<ParquetStatistics>,
) -> RowGroupMetaData {
let mut columns = vec![];
+ let number_row = 1000;
for (i, s) in column_statistics.iter().enumerate() {
let column = ColumnChunkMetaData::builder(schema_descr.column(i))
.set_statistics(s.clone())
+ .set_num_values(number_row)
.build()
.unwrap();
columns.push(column);
}
RowGroupMetaData::builder(schema_descr.clone())
- .set_num_rows(1000)
+ .set_num_rows(number_row)
.set_total_byte_size(2000)
.set_column_metadata(columns)
.build()
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs
b/datafusion/core/src/physical_optimizer/pruning.rs
index dc7e0529de..ebb811408f 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -335,6 +335,7 @@ pub trait PruningStatistics {
/// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5
END`
/// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE
x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN
false ELSE y_min <= 10 AND 10 <= y_max END`
/// `x IS NULL` | `x_null_count > 0`
+/// `x IS NOT NULL` | `x_null_count = 0`
/// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false
ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END`
///
/// ## Predicate Evaluation
@@ -1239,10 +1240,15 @@ fn build_single_column_expr(
/// returns a pruning expression in terms of IsNull that will evaluate to true
/// if the column may contain null, and false if definitely does not
/// contain null.
+/// If set `with_not` to true: which means is not null
+/// Given an expression reference to `expr`, if `expr` is a column expression,
+/// returns a pruning expression in terms of IsNotNull that will evaluate to
true
+/// if the column not contain any null, and false if definitely contain null.
fn build_is_null_column_expr(
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
required_columns: &mut RequiredColumns,
+ with_not: bool,
) -> Option<Arc<dyn PhysicalExpr>> {
if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
let field = schema.field_with_name(col.name()).ok()?;
@@ -1251,12 +1257,21 @@ fn build_is_null_column_expr(
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
- // IsNull(column) => null_count > 0
- Arc::new(phys_expr::BinaryExpr::new(
- null_count_column_expr,
- Operator::Gt,
-
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
- )) as _
+ if with_not {
+ // IsNotNull(column) => null_count = 0
+ Arc::new(phys_expr::BinaryExpr::new(
+ null_count_column_expr,
+ Operator::Eq,
+
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
+ )) as _
+ } else {
+ // IsNull(column) => null_count > 0
+ Arc::new(phys_expr::BinaryExpr::new(
+ null_count_column_expr,
+ Operator::Gt,
+
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
+ )) as _
+ }
})
.ok()
} else {
@@ -1287,9 +1302,18 @@ fn build_predicate_expression(
// predicate expression can only be a binary expression
let expr_any = expr.as_any();
if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
- return build_is_null_column_expr(is_null.arg(), schema,
required_columns)
+ return build_is_null_column_expr(is_null.arg(), schema,
required_columns, false)
.unwrap_or(unhandled);
}
+ if let Some(is_not_null) =
expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
+ return build_is_null_column_expr(
+ is_not_null.arg(),
+ schema,
+ required_columns,
+ true,
+ )
+ .unwrap_or(unhandled);
+ }
if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
return build_single_column_expr(col, schema, required_columns, false)
.unwrap_or(unhandled);
diff --git a/datafusion/core/tests/parquet/mod.rs
b/datafusion/core/tests/parquet/mod.rs
index b4415d638a..f36afe1976 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -28,6 +28,7 @@ use arrow::{
record_batch::RecordBatch,
util::pretty::pretty_format_batches,
};
+use arrow_array::new_null_array;
use chrono::{Datelike, Duration, TimeDelta};
use datafusion::{
datasource::{physical_plan::ParquetExec, provider_as_source,
TableProvider},
@@ -75,6 +76,7 @@ enum Scenario {
DecimalLargePrecisionBloomFilter,
ByteArray,
PeriodsInColumnNames,
+ WithNullValues,
}
enum Unit {
@@ -630,6 +632,27 @@ fn make_names_batch(name: &str, service_name_values:
Vec<&str>) -> RecordBatch {
RecordBatch::try_new(schema, vec![Arc::new(name),
Arc::new(service_name)]).unwrap()
}
+/// Return record batch with i8, i16, i32, and i64 sequences with all Null
values
+fn make_all_null_values() -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("i8", DataType::Int8, true),
+ Field::new("i16", DataType::Int16, true),
+ Field::new("i32", DataType::Int32, true),
+ Field::new("i64", DataType::Int64, true),
+ ]));
+
+ RecordBatch::try_new(
+ schema,
+ vec![
+ new_null_array(&DataType::Int8, 5),
+ new_null_array(&DataType::Int16, 5),
+ new_null_array(&DataType::Int32, 5),
+ new_null_array(&DataType::Int64, 5),
+ ],
+ )
+ .unwrap()
+}
+
fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
match scenario {
Scenario::Timestamps => {
@@ -799,6 +822,13 @@ fn create_data_batch(scenario: Scenario) ->
Vec<RecordBatch> {
),
]
}
+ Scenario::WithNullValues => {
+ vec![
+ make_all_null_values(),
+ make_int_batches(1, 6),
+ make_all_null_values(),
+ ]
+ }
}
}
diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs
b/datafusion/core/tests/parquet/row_group_pruning.rs
index 8fc7936552..29bf1ef0a8 100644
--- a/datafusion/core/tests/parquet/row_group_pruning.rs
+++ b/datafusion/core/tests/parquet/row_group_pruning.rs
@@ -1262,3 +1262,63 @@ async fn prune_periods_in_column_names() {
.test_row_group_prune()
.await;
}
+
+#[tokio::test]
+async fn test_row_group_with_null_values() {
+ // Three row groups:
+ // 1. all Null values
+ // 2. values from 1 to 5
+ // 3. all Null values
+
+ // After pruning, only row group 2 should be selected
+ RowGroupPruningTest::new()
+ .with_scenario(Scenario::WithNullValues)
+ .with_query("SELECT * FROM t WHERE \"i8\" <= 5")
+ .with_expected_errors(Some(0))
+ .with_matched_by_stats(Some(1))
+ .with_pruned_by_stats(Some(2))
+ .with_expected_rows(5)
+ .with_matched_by_bloom_filter(Some(0))
+ .with_pruned_by_bloom_filter(Some(0))
+ .test_row_group_prune()
+ .await;
+
+ // After pruning, only row group 1,3 should be selected
+ RowGroupPruningTest::new()
+ .with_scenario(Scenario::WithNullValues)
+ .with_query("SELECT * FROM t WHERE \"i8\" is Null")
+ .with_expected_errors(Some(0))
+ .with_matched_by_stats(Some(2))
+ .with_pruned_by_stats(Some(1))
+ .with_expected_rows(10)
+ .with_matched_by_bloom_filter(Some(0))
+ .with_pruned_by_bloom_filter(Some(0))
+ .test_row_group_prune()
+ .await;
+
+ // After pruning, only row group 2should be selected
+ RowGroupPruningTest::new()
+ .with_scenario(Scenario::WithNullValues)
+ .with_query("SELECT * FROM t WHERE \"i16\" is Not Null")
+ .with_expected_errors(Some(0))
+ .with_matched_by_stats(Some(1))
+ .with_pruned_by_stats(Some(2))
+ .with_expected_rows(5)
+ .with_matched_by_bloom_filter(Some(0))
+ .with_pruned_by_bloom_filter(Some(0))
+ .test_row_group_prune()
+ .await;
+
+ // All row groups will be pruned
+ RowGroupPruningTest::new()
+ .with_scenario(Scenario::WithNullValues)
+ .with_query("SELECT * FROM t WHERE \"i32\" > 7")
+ .with_expected_errors(Some(0))
+ .with_matched_by_stats(Some(0))
+ .with_pruned_by_stats(Some(3))
+ .with_expected_rows(0)
+ .with_matched_by_bloom_filter(Some(0))
+ .with_pruned_by_bloom_filter(Some(0))
+ .test_row_group_prune()
+ .await;
+}