This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 34fbe8e216 Fix count on all null `VALUES` clause (#13029)
34fbe8e216 is described below
commit 34fbe8e2161cd56171a04eeae6bb36c2b00040d9
Author: Piotr Findeisen <[email protected]>
AuthorDate: Mon Oct 21 22:39:08 2024 +0200
Fix count on all null `VALUES` clause (#13029)
* Test Count accumulator with all-nulls
* Fix count on null values
Before the change, the `ValuesExec` containing `NullArray` would
incorrectly report column statistics as being non-null, which would
misinform `AggregateStatistics` optimizer and fold `count(always_null)`
into row count instead of 0.
This commit fixes the column statistics derivation for values with
`NullArray` and therefore fixes execution of logical plans with count
over such values.
Note that the bug was not reproducible using DataFusion SQL frontend,
because in DataFusion SQL the `VALUES (NULL)` doesn't have type
`DataType:Null` (it has some apparently arbitrarily picked type
instead).
As a follow-up, all usages of `Array:null_count` should be inspected.
The function can easily be misused (it returns "physical nulls", which
do not exist for null type).
---
datafusion/core/tests/core_integration.rs | 3 +
datafusion/core/tests/execution/logical_plan.rs | 95 +++++++++++++++++++++++++
datafusion/core/tests/execution/mod.rs | 18 +++++
datafusion/functions-aggregate/src/count.rs | 14 ++++
datafusion/physical-plan/src/common.rs | 6 +-
datafusion/physical-plan/src/values.rs | 31 ++++++++
6 files changed, 166 insertions(+), 1 deletion(-)
diff --git a/datafusion/core/tests/core_integration.rs
b/datafusion/core/tests/core_integration.rs
index 79e5056e3c..e0917e6cca 100644
--- a/datafusion/core/tests/core_integration.rs
+++ b/datafusion/core/tests/core_integration.rs
@@ -24,6 +24,9 @@ mod dataframe;
/// Run all tests that are found in the `macro_hygiene` directory
mod macro_hygiene;
+/// Run all tests that are found in the `execution` directory
+mod execution;
+
/// Run all tests that are found in the `expr_api` directory
mod expr_api;
diff --git a/datafusion/core/tests/execution/logical_plan.rs
b/datafusion/core/tests/execution/logical_plan.rs
new file mode 100644
index 0000000000..168bf484e5
--- /dev/null
+++ b/datafusion/core/tests/execution/logical_plan.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::Int64Array;
+use arrow_schema::{DataType, Field};
+use datafusion::execution::session_state::SessionStateBuilder;
+use datafusion_common::{Column, DFSchema, Result, ScalarValue};
+use datafusion_execution::TaskContext;
+use datafusion_expr::expr::AggregateFunction;
+use datafusion_expr::logical_plan::{LogicalPlan, Values};
+use datafusion_expr::{Aggregate, AggregateUDF, Expr};
+use datafusion_functions_aggregate::count::Count;
+use datafusion_physical_plan::collect;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::ops::Deref;
+use std::sync::Arc;
+
+///! Logical plans need to provide stable semantics, as downstream projects
+///! create them and depend on them. Test executable semantics of logical
plans.
+
+#[tokio::test]
+async fn count_only_nulls() -> Result<()> {
+ // Input: VALUES (NULL), (NULL), (NULL) AS _(col)
+ let input_schema = Arc::new(DFSchema::from_unqualified_fields(
+ vec![Field::new("col", DataType::Null, true)].into(),
+ HashMap::new(),
+ )?);
+ let input = Arc::new(LogicalPlan::Values(Values {
+ schema: input_schema,
+ values: vec![
+ vec![Expr::Literal(ScalarValue::Null)],
+ vec![Expr::Literal(ScalarValue::Null)],
+ vec![Expr::Literal(ScalarValue::Null)],
+ ],
+ }));
+ let input_col_ref = Expr::Column(Column {
+ relation: None,
+ name: "col".to_string(),
+ });
+
+ // Aggregation: count(col) AS count
+ let aggregate = LogicalPlan::Aggregate(Aggregate::try_new(
+ input,
+ vec![],
+ vec![Expr::AggregateFunction(AggregateFunction {
+ func: Arc::new(AggregateUDF::new_from_impl(Count::new())),
+ args: vec![input_col_ref],
+ distinct: false,
+ filter: None,
+ order_by: None,
+ null_treatment: None,
+ })],
+ )?);
+
+ // Execute and verify results
+ let session_state = SessionStateBuilder::new().build();
+ let physical_plan = session_state.create_physical_plan(&aggregate).await?;
+ let result =
+ collect(physical_plan,
Arc::new(TaskContext::from(&session_state))).await?;
+
+ let result = only(result.as_slice());
+ let result_schema = result.schema();
+ let field = only(result_schema.fields().deref());
+ let column = only(result.columns());
+
+ assert_eq!(field.data_type(), &DataType::Int64); // TODO should be UInt64
+ assert_eq!(column.deref(), &Int64Array::from(vec![0]));
+
+ Ok(())
+}
+
+fn only<T>(elements: &[T]) -> &T
+where
+ T: Debug,
+{
+ let [element] = elements else {
+ panic!("Expected exactly one element, got {:?}", elements);
+ };
+ element
+}
diff --git a/datafusion/core/tests/execution/mod.rs
b/datafusion/core/tests/execution/mod.rs
new file mode 100644
index 0000000000..8169db1a46
--- /dev/null
+++ b/datafusion/core/tests/execution/mod.rs
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod logical_plan;
diff --git a/datafusion/functions-aggregate/src/count.rs
b/datafusion/functions-aggregate/src/count.rs
index 61dbfd6749..b4eeb937d4 100644
--- a/datafusion/functions-aggregate/src/count.rs
+++ b/datafusion/functions-aggregate/src/count.rs
@@ -715,3 +715,17 @@ impl Accumulator for DistinctCountAccumulator {
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::array::NullArray;
+
+ #[test]
+ fn count_accumulator_nulls() -> Result<()> {
+ let mut accumulator = CountAccumulator::new();
+ accumulator.update_batch(&[Arc::new(NullArray::new(10))])?;
+ assert_eq!(accumulator.evaluate()?, ScalarValue::Int64(Some(0)));
+ Ok(())
+ }
+}
diff --git a/datafusion/physical-plan/src/common.rs
b/datafusion/physical-plan/src/common.rs
index 4b5eea6b76..5abdf367c5 100644
--- a/datafusion/physical-plan/src/common.rs
+++ b/datafusion/physical-plan/src/common.rs
@@ -156,7 +156,11 @@ pub fn compute_record_batch_statistics(
for partition in batches.iter() {
for batch in partition {
for (stat_index, col_index) in projection.iter().enumerate() {
- null_counts[stat_index] +=
batch.column(*col_index).null_count();
+ null_counts[stat_index] += batch
+ .column(*col_index)
+ .logical_nulls()
+ .map(|nulls| nulls.null_count())
+ .unwrap_or_default();
}
}
}
diff --git a/datafusion/physical-plan/src/values.rs
b/datafusion/physical-plan/src/values.rs
index e01aea1fdd..ab5b45463b 100644
--- a/datafusion/physical-plan/src/values.rs
+++ b/datafusion/physical-plan/src/values.rs
@@ -219,6 +219,7 @@ mod tests {
use crate::test::{self, make_partition};
use arrow_schema::{DataType, Field};
+ use datafusion_common::stats::{ColumnStatistics, Precision};
#[tokio::test]
async fn values_empty_case() -> Result<()> {
@@ -269,4 +270,34 @@ mod tests {
let _ = ValuesExec::try_new(schema,
vec![vec![lit(ScalarValue::UInt32(None))]])
.unwrap_err();
}
+
+ #[test]
+ fn values_stats_with_nulls_only() -> Result<()> {
+ let data = vec![
+ vec![lit(ScalarValue::Null)],
+ vec![lit(ScalarValue::Null)],
+ vec![lit(ScalarValue::Null)],
+ ];
+ let rows = data.len();
+ let values = ValuesExec::try_new(
+ Arc::new(Schema::new(vec![Field::new("col0", DataType::Null,
true)])),
+ data,
+ )?;
+
+ assert_eq!(
+ values.statistics()?,
+ Statistics {
+ num_rows: Precision::Exact(rows),
+ total_byte_size: Precision::Exact(8), // not important
+ column_statistics: vec![ColumnStatistics {
+ null_count: Precision::Exact(rows), // there are only nulls
+ distinct_count: Precision::Absent,
+ max_value: Precision::Absent,
+ min_value: Precision::Absent,
+ },],
+ }
+ );
+
+ Ok(())
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]