This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 11ffa679e8 feat: support for null, date, and timestamp types in 
approx_distinct (#17618)
11ffa679e8 is described below

commit 11ffa679e8ce576594c5fd0f15b62655eaa60176
Author: dennis zhuang <[email protected]>
AuthorDate: Tue Sep 23 08:58:34 2025 +0800

    feat: support for null, date, and timestamp types in approx_distinct 
(#17618)
    
    * feat: let approx_distinct handle null, date and timestamp types
    
    Signed-off-by: Dennis Zhuang <[email protected]>
    
    * chore: update testing submodule
    
    Signed-off-by: Dennis Zhuang <[email protected]>
    
    * feat: supports time type and refactor NullHLLAccumulator
    
    Signed-off-by: Dennis Zhuang <[email protected]>
    
    * bump arrow-testing submodule
    
    ---------
    
    Signed-off-by: Dennis Zhuang <[email protected]>
    Co-authored-by: Jefffrey <[email protected]>
---
 .../functions-aggregate/src/approx_distinct.rs     | 60 +++++++++++++++++++++-
 datafusion/sqllogictest/test_files/aggregate.slt   | 40 ++++++++++-----
 testing                                            |  2 +-
 3 files changed, 85 insertions(+), 17 deletions(-)

diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs 
b/datafusion/functions-aggregate/src/approx_distinct.rs
index 74aa1bf68c..abb144c045 100644
--- a/datafusion/functions-aggregate/src/approx_distinct.rs
+++ b/datafusion/functions-aggregate/src/approx_distinct.rs
@@ -23,8 +23,11 @@ use arrow::array::{
     GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray,
 };
 use arrow::datatypes::{
-    ArrowPrimitiveType, FieldRef, Int16Type, Int32Type, Int64Type, Int8Type, 
UInt16Type,
-    UInt32Type, UInt64Type, UInt8Type,
+    ArrowPrimitiveType, Date32Type, Date64Type, FieldRef, Int16Type, Int32Type,
+    Int64Type, Int8Type, Time32MillisecondType, Time32SecondType, 
Time64MicrosecondType,
+    Time64NanosecondType, TimeUnit, TimestampMicrosecondType, 
TimestampMillisecondType,
+    TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, 
UInt64Type,
+    UInt8Type,
 };
 use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field};
 use datafusion_common::ScalarValue;
@@ -169,6 +172,9 @@ where
     }
 }
 
+#[derive(Debug)]
+struct NullHLLAccumulator;
+
 macro_rules! default_accumulator_impl {
     () => {
         fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
@@ -264,6 +270,29 @@ where
     default_accumulator_impl!();
 }
 
+impl Accumulator for NullHLLAccumulator {
+    fn update_batch(&mut self, _values: &[ArrayRef]) -> Result<()> {
+        // do nothing, all values are null
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, _states: &[ArrayRef]) -> Result<()> {
+        Ok(())
+    }
+
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![])
+    }
+
+    fn evaluate(&mut self) -> Result<ScalarValue> {
+        Ok(ScalarValue::UInt64(Some(0)))
+    }
+
+    fn size(&self) -> usize {
+        size_of_val(self)
+    }
+}
+
 impl Debug for ApproxDistinct {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         f.debug_struct("ApproxDistinct")
@@ -347,11 +376,38 @@ impl AggregateUDFImpl for ApproxDistinct {
             DataType::Int16 => 
Box::new(NumericHLLAccumulator::<Int16Type>::new()),
             DataType::Int32 => 
Box::new(NumericHLLAccumulator::<Int32Type>::new()),
             DataType::Int64 => 
Box::new(NumericHLLAccumulator::<Int64Type>::new()),
+            DataType::Date32 => 
Box::new(NumericHLLAccumulator::<Date32Type>::new()),
+            DataType::Date64 => 
Box::new(NumericHLLAccumulator::<Date64Type>::new()),
+            DataType::Time32(TimeUnit::Second) => {
+                Box::new(NumericHLLAccumulator::<Time32SecondType>::new())
+            }
+            DataType::Time32(TimeUnit::Millisecond) => {
+                Box::new(NumericHLLAccumulator::<Time32MillisecondType>::new())
+            }
+            DataType::Time64(TimeUnit::Microsecond) => {
+                Box::new(NumericHLLAccumulator::<Time64MicrosecondType>::new())
+            }
+            DataType::Time64(TimeUnit::Nanosecond) => {
+                Box::new(NumericHLLAccumulator::<Time64NanosecondType>::new())
+            }
+            DataType::Timestamp(TimeUnit::Second, _) => {
+                Box::new(NumericHLLAccumulator::<TimestampSecondType>::new())
+            }
+            DataType::Timestamp(TimeUnit::Millisecond, _) => {
+                
Box::new(NumericHLLAccumulator::<TimestampMillisecondType>::new())
+            }
+            DataType::Timestamp(TimeUnit::Microsecond, _) => {
+                
Box::new(NumericHLLAccumulator::<TimestampMicrosecondType>::new())
+            }
+            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+                
Box::new(NumericHLLAccumulator::<TimestampNanosecondType>::new())
+            }
             DataType::Utf8 => Box::new(StringHLLAccumulator::<i32>::new()),
             DataType::LargeUtf8 => 
Box::new(StringHLLAccumulator::<i64>::new()),
             DataType::Utf8View => 
Box::new(StringViewHLLAccumulator::<i32>::new()),
             DataType::Binary => Box::new(BinaryHLLAccumulator::<i32>::new()),
             DataType::LargeBinary => 
Box::new(BinaryHLLAccumulator::<i64>::new()),
+            DataType::Null => Box::new(NullHLLAccumulator),
             other => {
                 return not_impl_err!(
                 "Support for 'approx_distinct' for data type {other} is not 
implemented"
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index 332b35b6ad..5375159c51 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -32,10 +32,12 @@ CREATE EXTERNAL TABLE aggregate_test_100 (
   c10 BIGINT UNSIGNED NOT NULL,
   c11 FLOAT NOT NULL,
   c12 DOUBLE NOT NULL,
-  c13 VARCHAR NOT NULL
+  c13 VARCHAR NOT NULL,
+  c14 DATE NOT NULL,
+  c15 TIMESTAMP NOT NULL,
 )
 STORED AS CSV
-LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+LOCATION '../../testing/data/csv/aggregate_test_100_with_dates.csv'
 OPTIONS ('format.has_header' 'true');
 
 statement ok
@@ -1307,12 +1309,24 @@ SELECT COUNT(2) FROM aggregate_test_100
 # ----
 # 100 99
 
+# csv_query_approx_count_literal_null
+query I
+SELECT approx_distinct(null)
+----
+0
+
 # csv_query_approx_count_dupe_expr_aliased
 query II
 SELECT approx_distinct(c9) AS a, approx_distinct(c9) AS b FROM 
aggregate_test_100
 ----
 100 100
 
+# csv_query_approx_count_date_timestamp
+query IIIII
+SELECT approx_distinct(c14) AS a, approx_distinct(c15) AS b, 
approx_distinct(arrow_cast(c15, 'Date64')), approx_distinct(arrow_cast(c15, 
'Time32(Second)')) as c, approx_distinct(arrow_cast(c15, 'Time64(Nanosecond)')) 
AS d FROM aggregate_test_100
+----
+18 60 60 60 60
+
 ## This test executes the APPROX_PERCENTILE_CONT aggregation against the test
 ## data, asserting the estimated quantiles are ±5% their actual values.
 ##
@@ -4719,9 +4733,7 @@ statement ok
 create table t as
 select
   arrow_cast(column1, 'Date32') as date32,
-  -- Workaround https://github.com/apache/arrow-rs/issues/4512 is fixed, can 
use this
-  -- arrow_cast(column1, 'Date64') as date64,
-  arrow_cast(arrow_cast(column1, 'Date32'), 'Date64') as date64,
+  arrow_cast(column1, 'Date64') as date64,
   column2 as names,
   column3 as tag
 from t_source;
@@ -5549,7 +5561,7 @@ physical_plan
 08)--------------RepartitionExec: partitioning=Hash([c3@0], 4), 
input_partitions=4
 09)----------------AggregateExec: mode=Partial, gby=[c3@1 as c3], 
aggr=[min(aggregate_test_100.c1)]
 10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-11)--------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c3], file_type=csv, has_header=true
+11)--------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, 
projection=[c1, c3], file_type=csv, has_header=true
 
 
 #
@@ -5574,7 +5586,7 @@ physical_plan
 03)----CoalescePartitionsExec
 04)------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5]
 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], 
file_type=csv, has_header=true
+06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, 
projection=[c3], file_type=csv, has_header=true
 
 query I
 SELECT DISTINCT c3 FROM aggregate_test_100 group by c3 order by c3 limit 5;
@@ -5598,7 +5610,7 @@ physical_plan
 03)----CoalescePartitionsExec
 04)------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[], 
lim=[9]
 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3], file_type=csv, has_header=true
+06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, 
projection=[c2, c3], file_type=csv, has_header=true
 
 query II
 SELECT c2, c3 FROM aggregate_test_100 group by c2, c3 order by c2, c3 limit 5 
offset 4;
@@ -5633,7 +5645,7 @@ physical_plan
 10)------------------CoalesceBatchesExec: target_batch_size=8192
 11)--------------------FilterExec: c3@1 >= 10 AND c3@1 <= 20
 12)----------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-13)------------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3], file_type=csv, has_header=true
+13)------------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, 
projection=[c2, c3], file_type=csv, has_header=true
 
 query I
 SELECT DISTINCT c3 FROM aggregate_test_100 WHERE c3 between 10 and 20 group by 
c3 order by c3 limit 4;
@@ -5659,7 +5671,7 @@ physical_plan
 04)------CoalescePartitionsExec
 05)--------AggregateExec: mode=Partial, gby=[c2@1 as c2, c3@2 as c3], 
aggr=[max(aggregate_test_100.c1)]
 06)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-07)------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c2, c3], file_type=csv, has_header=true
+07)------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, 
projection=[c1, c2, c3], file_type=csv, has_header=true
 
 # TODO(msirek): Extend checking in LimitedDistinctAggregation equal groupings 
to ignore the order of columns
 # in the group-by column lists, so the limit could be pushed to the lowest 
AggregateExec in this case
@@ -5683,7 +5695,7 @@ physical_plan
 08)--------------CoalescePartitionsExec
 09)----------------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], 
aggr=[]
 10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-11)--------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3], file_type=csv, has_header=true
+11)--------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, 
projection=[c2, c3], file_type=csv, has_header=true
 
 query II
 SELECT DISTINCT c3, c2 FROM aggregate_test_100 group by c3, c2 order by c3, c2 
limit 3 offset 10;
@@ -5707,7 +5719,7 @@ physical_plan
 04)------CoalescePartitionsExec
 05)--------AggregateExec: mode=Partial, gby=[(NULL as c2, NULL as c3), (c2@0 
as c2, NULL as c3), (c2@0 as c2, c3@1 as c3)], aggr=[]
 06)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-07)------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3], file_type=csv, has_header=true
+07)------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, 
projection=[c2, c3], file_type=csv, has_header=true
 
 query II
 SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) limit 3;
@@ -5734,7 +5746,7 @@ physical_plan
 03)----CoalescePartitionsExec
 04)------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[]
 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], 
file_type=csv, has_header=true
+06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, 
projection=[c3], file_type=csv, has_header=true
 
 statement ok
 set datafusion.optimizer.enable_distinct_aggregation_soft_limit = true;
@@ -6955,7 +6967,7 @@ physical_plan
 03)----CoalescePartitionsExec
 04)------AggregateExec: mode=Partial, gby=[], 
aggr=[count(aggregate_test_100.c5)]
 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5], 
file_type=csv, has_header=true
+06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, 
projection=[c5], file_type=csv, has_header=true
 
 statement count 0
 drop table aggregate_test_100;
diff --git a/testing b/testing
index d2a1371230..0d60ccae40 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit d2a13712303498963395318a4eb42872e66aead7
+Subproject commit 0d60ccae40d0e8f2d22c15fafb01c5d4be8c63a6


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to