This is an automated email from the ASF dual-hosted git repository.
houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 03075d5 Fix null comparison for Parquet pruning predicate (#1595)
03075d5 is described below
commit 03075d5f4b3fdfd8f82144fcd409418832a4bf69
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Thu Jan 20 23:17:50 2022 -0800
Fix null comparison for Parquet pruning predicate (#1595)
* Fix null comparison for Parquet pruning predicate
* Use IsNull for null count predicate pruning
---
datafusion/src/physical_optimizer/pruning.rs | 66 ++++++++++++++-
.../src/physical_plan/file_format/parquet.rs | 96 ++++++++++++++++++----
2 files changed, 143 insertions(+), 19 deletions(-)
diff --git a/datafusion/src/physical_optimizer/pruning.rs
b/datafusion/src/physical_optimizer/pruning.rs
index 5f167e2..22b854b 100644
--- a/datafusion/src/physical_optimizer/pruning.rs
+++ b/datafusion/src/physical_optimizer/pruning.rs
@@ -37,6 +37,7 @@ use arrow::{
record_batch::RecordBatch,
};
+use crate::prelude::lit;
use crate::{
error::{DataFusionError, Result},
execution::context::ExecutionContextState,
@@ -75,6 +76,12 @@ pub trait PruningStatistics {
/// return the number of containers (e.g. row groups) being
/// pruned with these statistics
fn num_containers(&self) -> usize;
+
+ /// return the number of null values for the named column as an
+ /// `Option<UInt64Array>`.
+ ///
+ /// Note: the returned array must contain `num_containers()` rows.
+ fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
}
/// Evaluates filter expressions on statistics in order to
@@ -200,7 +207,7 @@ impl PruningPredicate {
struct RequiredStatColumns {
/// The statistics required to evaluate this predicate:
/// * The unqualified column in the input schema
- /// * Statistics type (e.g. Min or Max)
+ /// * Statistics type (e.g. Min or Max or Null_Count)
/// * The field the statistics value should be placed in for
/// pruning predicate evaluation
columns: Vec<(Column, StatisticsType, Field)>,
@@ -281,6 +288,22 @@ impl RequiredStatColumns {
) -> Result<Expr> {
self.stat_column_expr(column, column_expr, field, StatisticsType::Max,
"max")
}
+
+ /// rewrite col --> col_null_count
+ fn null_count_column_expr(
+ &mut self,
+ column: &Column,
+ column_expr: &Expr,
+ field: &Field,
+ ) -> Result<Expr> {
+ self.stat_column_expr(
+ column,
+ column_expr,
+ field,
+ StatisticsType::NullCount,
+ "null_count",
+ )
+ }
}
impl From<Vec<(Column, StatisticsType, Field)>> for RequiredStatColumns {
@@ -329,6 +352,7 @@ fn build_statistics_record_batch<S: PruningStatistics>(
let array = match statistics_type {
StatisticsType::Min => statistics.min_values(column),
StatisticsType::Max => statistics.max_values(column),
+ StatisticsType::NullCount => statistics.null_counts(column),
};
let array = array.unwrap_or_else(|| new_null_array(data_type,
num_containers));
@@ -582,6 +606,32 @@ fn build_single_column_expr(
}
}
+/// Given an expression reference to `expr`, if `expr` is a column expression,
+/// returns a pruning expression in terms of IsNull that will evaluate to true
+/// if the column may contain null, and false if definitely does not
+/// contain null.
+fn build_is_null_column_expr(
+ expr: &Expr,
+ schema: &Schema,
+ required_columns: &mut RequiredStatColumns,
+) -> Option<Expr> {
+ match expr {
+ Expr::Column(ref col) => {
+ let field = schema.field_with_name(&col.name).ok()?;
+
+ let null_count_field = &Field::new(field.name(), DataType::UInt64,
false);
+ required_columns
+ .null_count_column_expr(col, expr, null_count_field)
+ .map(|null_count_column_expr| {
+ // IsNull(column) => null_count > 0
+ null_count_column_expr.gt(lit::<u64>(0))
+ })
+ .ok()
+ }
+ _ => None,
+ }
+}
+
/// Translate logical filter expression into pruning predicate
/// expression that will evaluate to FALSE if it can be determined no
/// rows between the min/max values could pass the predicates.
@@ -602,6 +652,11 @@ fn build_predicate_expression(
// predicate expression can only be a binary expression
let (left, op, right) = match expr {
Expr::BinaryExpr { left, op, right } => (left, *op, right),
+ Expr::IsNull(expr) => {
+ let expr = build_is_null_column_expr(expr, schema,
required_columns)
+ .unwrap_or(unhandled);
+ return Ok(expr);
+ }
Expr::Column(col) => {
let expr = build_single_column_expr(col, schema, required_columns,
false)
.unwrap_or(unhandled);
@@ -702,6 +757,7 @@ fn build_statistics_expr(expr_builder: &mut
PruningExpressionBuilder) -> Result<
enum StatisticsType {
Min,
Max,
+ NullCount,
}
#[cfg(test)]
@@ -812,6 +868,10 @@ mod tests {
.map(|container_stats| container_stats.len())
.unwrap_or(0)
}
+
+ fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
+ None
+ }
}
/// Returns the specified min/max container values
@@ -833,6 +893,10 @@ mod tests {
fn num_containers(&self) -> usize {
self.num_containers
}
+
+ fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
+ None
+ }
}
#[test]
diff --git a/datafusion/src/physical_plan/file_format/parquet.rs
b/datafusion/src/physical_plan/file_format/parquet.rs
index 17abb43..5768d0a 100644
--- a/datafusion/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/src/physical_plan/file_format/parquet.rs
@@ -341,6 +341,31 @@ macro_rules! get_min_max_values {
}}
}
+// Extract the null count value on the ParquetStatistics
+macro_rules! get_null_count_values {
+ ($self:expr, $column:expr) => {{
+ let column_index =
+ if let Some((v, _)) =
$self.parquet_schema.column_with_name(&$column.name) {
+ v
+ } else {
+ // Named column was not present
+ return None;
+ };
+
+ let scalar_values: Vec<ScalarValue> = $self
+ .row_group_metadata
+ .iter()
+ .flat_map(|meta| meta.column(column_index).statistics())
+ .map(|stats| {
+
ScalarValue::UInt64(Some(stats.null_count().try_into().unwrap()))
+ })
+ .collect();
+
+ // ignore errors converting to arrays (e.g. different types)
+ ScalarValue::iter_to_array(scalar_values).ok()
+ }};
+}
+
impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
get_min_max_values!(self, column, min, min_bytes)
@@ -353,6 +378,10 @@ impl<'a> PruningStatistics for
RowGroupPruningStatistics<'a> {
fn num_containers(&self) -> usize {
self.row_group_metadata.len()
}
+
+ fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
+ get_null_count_values!(self, column)
+ }
}
fn build_row_group_predicate(
@@ -710,21 +739,7 @@ mod tests {
Ok(())
}
- #[test]
- fn row_group_pruning_predicate_null_expr() -> Result<()> {
- use crate::logical_plan::{col, lit};
- // test row group predicate with an unknown (Null) expr
- //
- // int > 1 and bool = NULL => c1_max > 1 and null
- let expr = col("c1")
- .gt(lit(15))
- .and(col("c2").eq(lit(ScalarValue::Boolean(None))));
- let schema = Arc::new(Schema::new(vec![
- Field::new("c1", DataType::Int32, false),
- Field::new("c2", DataType::Boolean, false),
- ]));
- let pruning_predicate = PruningPredicate::try_new(&expr, schema)?;
-
+ fn gen_row_group_meta_data_for_pruning_predicate() ->
Vec<RowGroupMetaData> {
let schema_descr = get_test_schema_descr(vec![
("c1", PhysicalType::INT32),
("c2", PhysicalType::BOOLEAN),
@@ -740,10 +755,56 @@ mod tests {
&schema_descr,
vec![
ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
- ParquetStatistics::boolean(Some(false), Some(true), None, 0,
false),
+ ParquetStatistics::boolean(Some(false), Some(true), None, 1,
false),
],
);
- let row_group_metadata = vec![rgm1, rgm2];
+ vec![rgm1, rgm2]
+ }
+
+ #[test]
+ fn row_group_pruning_predicate_null_expr() -> Result<()> {
+ use crate::logical_plan::{col, lit};
+ // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
+ let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, false),
+ Field::new("c2", DataType::Boolean, false),
+ ]));
+ let pruning_predicate = PruningPredicate::try_new(&expr, schema)?;
+ let row_group_metadata =
gen_row_group_meta_data_for_pruning_predicate();
+
+ let row_group_predicate = build_row_group_predicate(
+ &pruning_predicate,
+ parquet_file_metrics(),
+ &row_group_metadata,
+ );
+ let row_group_filter = row_group_metadata
+ .iter()
+ .enumerate()
+ .map(|(i, g)| row_group_predicate(g, i))
+ .collect::<Vec<_>>();
+ // First row group was filtered out because it contains no null value
on "c2".
+ assert_eq!(row_group_filter, vec![false, true]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn row_group_pruning_predicate_eq_null_expr() -> Result<()> {
+ use crate::logical_plan::{col, lit};
+ // test row group predicate with an unknown (Null) expr
+ //
+ // int > 1 and bool = NULL => c1_max > 1 and null
+ let expr = col("c1")
+ .gt(lit(15))
+ .and(col("c2").eq(lit(ScalarValue::Boolean(None))));
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, false),
+ Field::new("c2", DataType::Boolean, false),
+ ]));
+ let pruning_predicate = PruningPredicate::try_new(&expr, schema)?;
+ let row_group_metadata =
gen_row_group_meta_data_for_pruning_predicate();
+
let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
@@ -756,7 +817,6 @@ mod tests {
.collect::<Vec<_>>();
// no row group is filtered out because the predicate expression can't
be evaluated
// when a null array is generated for a statistics column,
- // because the null values propagate to the end result, making the
predicate result undefined
assert_eq!(row_group_filter, vec![true, true]);
Ok(())