This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 34fbe8e216 Fix count on all null `VALUES` clause (#13029)
34fbe8e216 is described below

commit 34fbe8e2161cd56171a04eeae6bb36c2b00040d9
Author: Piotr Findeisen <[email protected]>
AuthorDate: Mon Oct 21 22:39:08 2024 +0200

    Fix count on all null `VALUES` clause (#13029)
    
    * Test Count accumulator with all-nulls
    
    * Fix count on null values
    
    Before the change, the `ValuesExec` containing `NullArray` would
    incorrectly report column statistics as being non-null, which would
    misinform `AggregateStatistics` optimizer and fold `count(always_null)`
    into row count instead of 0.
    
    This commit fixes the column statistics derivation for values with
    `NullArray` and therefore fixes execution of logical plans with count
    over such values.
    
    Note that the bug was not reproducible using DataFusion SQL frontend,
    because in DataFusion SQL the `VALUES (NULL)` doesn't have type
    `DataType:Null` (it has some apparently arbitrarily picked type
    instead).
    
    As a follow-up, all usages of `Array:null_count` should be inspected.
    The function can easily be misused (it returns "physical nulls", which
    do not exist for null type).
---
 datafusion/core/tests/core_integration.rs       |  3 +
 datafusion/core/tests/execution/logical_plan.rs | 95 +++++++++++++++++++++++++
 datafusion/core/tests/execution/mod.rs          | 18 +++++
 datafusion/functions-aggregate/src/count.rs     | 14 ++++
 datafusion/physical-plan/src/common.rs          |  6 +-
 datafusion/physical-plan/src/values.rs          | 31 ++++++++
 6 files changed, 166 insertions(+), 1 deletion(-)

diff --git a/datafusion/core/tests/core_integration.rs 
b/datafusion/core/tests/core_integration.rs
index 79e5056e3c..e0917e6cca 100644
--- a/datafusion/core/tests/core_integration.rs
+++ b/datafusion/core/tests/core_integration.rs
@@ -24,6 +24,9 @@ mod dataframe;
 /// Run all tests that are found in the `macro_hygiene` directory
 mod macro_hygiene;
 
+/// Run all tests that are found in the `execution` directory
+mod execution;
+
 /// Run all tests that are found in the `expr_api` directory
 mod expr_api;
 
diff --git a/datafusion/core/tests/execution/logical_plan.rs 
b/datafusion/core/tests/execution/logical_plan.rs
new file mode 100644
index 0000000000..168bf484e5
--- /dev/null
+++ b/datafusion/core/tests/execution/logical_plan.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::Int64Array;
+use arrow_schema::{DataType, Field};
+use datafusion::execution::session_state::SessionStateBuilder;
+use datafusion_common::{Column, DFSchema, Result, ScalarValue};
+use datafusion_execution::TaskContext;
+use datafusion_expr::expr::AggregateFunction;
+use datafusion_expr::logical_plan::{LogicalPlan, Values};
+use datafusion_expr::{Aggregate, AggregateUDF, Expr};
+use datafusion_functions_aggregate::count::Count;
+use datafusion_physical_plan::collect;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::ops::Deref;
+use std::sync::Arc;
+
+///! Logical plans need to provide stable semantics, as downstream projects
+///! create them and depend on them. Test executable semantics of logical 
plans.
+
+#[tokio::test]
+async fn count_only_nulls() -> Result<()> {
+    // Input: VALUES (NULL), (NULL), (NULL) AS _(col)
+    let input_schema = Arc::new(DFSchema::from_unqualified_fields(
+        vec![Field::new("col", DataType::Null, true)].into(),
+        HashMap::new(),
+    )?);
+    let input = Arc::new(LogicalPlan::Values(Values {
+        schema: input_schema,
+        values: vec![
+            vec![Expr::Literal(ScalarValue::Null)],
+            vec![Expr::Literal(ScalarValue::Null)],
+            vec![Expr::Literal(ScalarValue::Null)],
+        ],
+    }));
+    let input_col_ref = Expr::Column(Column {
+        relation: None,
+        name: "col".to_string(),
+    });
+
+    // Aggregation: count(col) AS count
+    let aggregate = LogicalPlan::Aggregate(Aggregate::try_new(
+        input,
+        vec![],
+        vec![Expr::AggregateFunction(AggregateFunction {
+            func: Arc::new(AggregateUDF::new_from_impl(Count::new())),
+            args: vec![input_col_ref],
+            distinct: false,
+            filter: None,
+            order_by: None,
+            null_treatment: None,
+        })],
+    )?);
+
+    // Execute and verify results
+    let session_state = SessionStateBuilder::new().build();
+    let physical_plan = session_state.create_physical_plan(&aggregate).await?;
+    let result =
+        collect(physical_plan, 
Arc::new(TaskContext::from(&session_state))).await?;
+
+    let result = only(result.as_slice());
+    let result_schema = result.schema();
+    let field = only(result_schema.fields().deref());
+    let column = only(result.columns());
+
+    assert_eq!(field.data_type(), &DataType::Int64); // TODO should be UInt64
+    assert_eq!(column.deref(), &Int64Array::from(vec![0]));
+
+    Ok(())
+}
+
+fn only<T>(elements: &[T]) -> &T
+where
+    T: Debug,
+{
+    let [element] = elements else {
+        panic!("Expected exactly one element, got {:?}", elements);
+    };
+    element
+}
diff --git a/datafusion/core/tests/execution/mod.rs 
b/datafusion/core/tests/execution/mod.rs
new file mode 100644
index 0000000000..8169db1a46
--- /dev/null
+++ b/datafusion/core/tests/execution/mod.rs
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod logical_plan;
diff --git a/datafusion/functions-aggregate/src/count.rs 
b/datafusion/functions-aggregate/src/count.rs
index 61dbfd6749..b4eeb937d4 100644
--- a/datafusion/functions-aggregate/src/count.rs
+++ b/datafusion/functions-aggregate/src/count.rs
@@ -715,3 +715,17 @@ impl Accumulator for DistinctCountAccumulator {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::NullArray;
+
+    #[test]
+    fn count_accumulator_nulls() -> Result<()> {
+        let mut accumulator = CountAccumulator::new();
+        accumulator.update_batch(&[Arc::new(NullArray::new(10))])?;
+        assert_eq!(accumulator.evaluate()?, ScalarValue::Int64(Some(0)));
+        Ok(())
+    }
+}
diff --git a/datafusion/physical-plan/src/common.rs 
b/datafusion/physical-plan/src/common.rs
index 4b5eea6b76..5abdf367c5 100644
--- a/datafusion/physical-plan/src/common.rs
+++ b/datafusion/physical-plan/src/common.rs
@@ -156,7 +156,11 @@ pub fn compute_record_batch_statistics(
     for partition in batches.iter() {
         for batch in partition {
             for (stat_index, col_index) in projection.iter().enumerate() {
-                null_counts[stat_index] += 
batch.column(*col_index).null_count();
+                null_counts[stat_index] += batch
+                    .column(*col_index)
+                    .logical_nulls()
+                    .map(|nulls| nulls.null_count())
+                    .unwrap_or_default();
             }
         }
     }
diff --git a/datafusion/physical-plan/src/values.rs 
b/datafusion/physical-plan/src/values.rs
index e01aea1fdd..ab5b45463b 100644
--- a/datafusion/physical-plan/src/values.rs
+++ b/datafusion/physical-plan/src/values.rs
@@ -219,6 +219,7 @@ mod tests {
     use crate::test::{self, make_partition};
 
     use arrow_schema::{DataType, Field};
+    use datafusion_common::stats::{ColumnStatistics, Precision};
 
     #[tokio::test]
     async fn values_empty_case() -> Result<()> {
@@ -269,4 +270,34 @@ mod tests {
         let _ = ValuesExec::try_new(schema, 
vec![vec![lit(ScalarValue::UInt32(None))]])
             .unwrap_err();
     }
+
+    #[test]
+    fn values_stats_with_nulls_only() -> Result<()> {
+        let data = vec![
+            vec![lit(ScalarValue::Null)],
+            vec![lit(ScalarValue::Null)],
+            vec![lit(ScalarValue::Null)],
+        ];
+        let rows = data.len();
+        let values = ValuesExec::try_new(
+            Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, 
true)])),
+            data,
+        )?;
+
+        assert_eq!(
+            values.statistics()?,
+            Statistics {
+                num_rows: Precision::Exact(rows),
+                total_byte_size: Precision::Exact(8), // not important
+                column_statistics: vec![ColumnStatistics {
+                    null_count: Precision::Exact(rows), // there are only nulls
+                    distinct_count: Precision::Absent,
+                    max_value: Precision::Absent,
+                    min_value: Precision::Absent,
+                },],
+            }
+        );
+
+        Ok(())
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to