This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 11ffa679e8 feat: support for null, date, and timestamp types in
approx_distinct (#17618)
11ffa679e8 is described below
commit 11ffa679e8ce576594c5fd0f15b62655eaa60176
Author: dennis zhuang <[email protected]>
AuthorDate: Tue Sep 23 08:58:34 2025 +0800
feat: support for null, date, and timestamp types in approx_distinct
(#17618)
* feat: let approx_distinct handle null, date and timestamp types
Signed-off-by: Dennis Zhuang <[email protected]>
* chore: update testing submodule
Signed-off-by: Dennis Zhuang <[email protected]>
* feat: supports time type and refactor NullHLLAccumulator
Signed-off-by: Dennis Zhuang <[email protected]>
* bump arrow-testing submodule
---------
Signed-off-by: Dennis Zhuang <[email protected]>
Co-authored-by: Jefffrey <[email protected]>
---
.../functions-aggregate/src/approx_distinct.rs | 60 +++++++++++++++++++++-
datafusion/sqllogictest/test_files/aggregate.slt | 40 ++++++++++-----
testing | 2 +-
3 files changed, 85 insertions(+), 17 deletions(-)
diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs
b/datafusion/functions-aggregate/src/approx_distinct.rs
index 74aa1bf68c..abb144c045 100644
--- a/datafusion/functions-aggregate/src/approx_distinct.rs
+++ b/datafusion/functions-aggregate/src/approx_distinct.rs
@@ -23,8 +23,11 @@ use arrow::array::{
GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray,
};
use arrow::datatypes::{
- ArrowPrimitiveType, FieldRef, Int16Type, Int32Type, Int64Type, Int8Type,
UInt16Type,
- UInt32Type, UInt64Type, UInt8Type,
+ ArrowPrimitiveType, Date32Type, Date64Type, FieldRef, Int16Type, Int32Type,
+ Int64Type, Int8Type, Time32MillisecondType, Time32SecondType,
Time64MicrosecondType,
+ Time64NanosecondType, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType,
+ TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type,
UInt64Type,
+ UInt8Type,
};
use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field};
use datafusion_common::ScalarValue;
@@ -169,6 +172,9 @@ where
}
}
+#[derive(Debug)]
+struct NullHLLAccumulator;
+
macro_rules! default_accumulator_impl {
() => {
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
@@ -264,6 +270,29 @@ where
default_accumulator_impl!();
}
+impl Accumulator for NullHLLAccumulator {
+ fn update_batch(&mut self, _values: &[ArrayRef]) -> Result<()> {
+ // do nothing, all values are null
+ Ok(())
+ }
+
+ fn merge_batch(&mut self, _states: &[ArrayRef]) -> Result<()> {
+ Ok(())
+ }
+
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
+ Ok(vec![])
+ }
+
+ fn evaluate(&mut self) -> Result<ScalarValue> {
+ Ok(ScalarValue::UInt64(Some(0)))
+ }
+
+ fn size(&self) -> usize {
+ size_of_val(self)
+ }
+}
+
impl Debug for ApproxDistinct {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ApproxDistinct")
@@ -347,11 +376,38 @@ impl AggregateUDFImpl for ApproxDistinct {
DataType::Int16 =>
Box::new(NumericHLLAccumulator::<Int16Type>::new()),
DataType::Int32 =>
Box::new(NumericHLLAccumulator::<Int32Type>::new()),
DataType::Int64 =>
Box::new(NumericHLLAccumulator::<Int64Type>::new()),
+ DataType::Date32 =>
Box::new(NumericHLLAccumulator::<Date32Type>::new()),
+ DataType::Date64 =>
Box::new(NumericHLLAccumulator::<Date64Type>::new()),
+ DataType::Time32(TimeUnit::Second) => {
+ Box::new(NumericHLLAccumulator::<Time32SecondType>::new())
+ }
+ DataType::Time32(TimeUnit::Millisecond) => {
+ Box::new(NumericHLLAccumulator::<Time32MillisecondType>::new())
+ }
+ DataType::Time64(TimeUnit::Microsecond) => {
+ Box::new(NumericHLLAccumulator::<Time64MicrosecondType>::new())
+ }
+ DataType::Time64(TimeUnit::Nanosecond) => {
+ Box::new(NumericHLLAccumulator::<Time64NanosecondType>::new())
+ }
+ DataType::Timestamp(TimeUnit::Second, _) => {
+ Box::new(NumericHLLAccumulator::<TimestampSecondType>::new())
+ }
+ DataType::Timestamp(TimeUnit::Millisecond, _) => {
+
Box::new(NumericHLLAccumulator::<TimestampMillisecondType>::new())
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) => {
+
Box::new(NumericHLLAccumulator::<TimestampMicrosecondType>::new())
+ }
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+
Box::new(NumericHLLAccumulator::<TimestampNanosecondType>::new())
+ }
DataType::Utf8 => Box::new(StringHLLAccumulator::<i32>::new()),
DataType::LargeUtf8 =>
Box::new(StringHLLAccumulator::<i64>::new()),
DataType::Utf8View =>
Box::new(StringViewHLLAccumulator::<i32>::new()),
DataType::Binary => Box::new(BinaryHLLAccumulator::<i32>::new()),
DataType::LargeBinary =>
Box::new(BinaryHLLAccumulator::<i64>::new()),
+ DataType::Null => Box::new(NullHLLAccumulator),
other => {
return not_impl_err!(
"Support for 'approx_distinct' for data type {other} is not
implemented"
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt
b/datafusion/sqllogictest/test_files/aggregate.slt
index 332b35b6ad..5375159c51 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -32,10 +32,12 @@ CREATE EXTERNAL TABLE aggregate_test_100 (
c10 BIGINT UNSIGNED NOT NULL,
c11 FLOAT NOT NULL,
c12 DOUBLE NOT NULL,
- c13 VARCHAR NOT NULL
+ c13 VARCHAR NOT NULL,
+ c14 DATE NOT NULL,
+ c15 TIMESTAMP NOT NULL,
)
STORED AS CSV
-LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+LOCATION '../../testing/data/csv/aggregate_test_100_with_dates.csv'
OPTIONS ('format.has_header' 'true');
statement ok
@@ -1307,12 +1309,24 @@ SELECT COUNT(2) FROM aggregate_test_100
# ----
# 100 99
+# csv_query_approx_count_literal_null
+query I
+SELECT approx_distinct(null)
+----
+0
+
# csv_query_approx_count_dupe_expr_aliased
query II
SELECT approx_distinct(c9) AS a, approx_distinct(c9) AS b FROM
aggregate_test_100
----
100 100
+# csv_query_approx_count_date_timestamp
+query IIIII
+SELECT approx_distinct(c14) AS a, approx_distinct(c15) AS b,
approx_distinct(arrow_cast(c15, 'Date64')), approx_distinct(arrow_cast(c15,
'Time32(Second)')) as c, approx_distinct(arrow_cast(c15, 'Time64(Nanosecond)'))
AS d FROM aggregate_test_100
+----
+18 60 60 60 60
+
## This test executes the APPROX_PERCENTILE_CONT aggregation against the test
## data, asserting the estimated quantiles are ±5% their actual values.
##
@@ -4719,9 +4733,7 @@ statement ok
create table t as
select
arrow_cast(column1, 'Date32') as date32,
- -- Workaround https://github.com/apache/arrow-rs/issues/4512 is fixed, can
use this
- -- arrow_cast(column1, 'Date64') as date64,
- arrow_cast(arrow_cast(column1, 'Date32'), 'Date64') as date64,
+ arrow_cast(column1, 'Date64') as date64,
column2 as names,
column3 as tag
from t_source;
@@ -5549,7 +5561,7 @@ physical_plan
08)--------------RepartitionExec: partitioning=Hash([c3@0], 4),
input_partitions=4
09)----------------AggregateExec: mode=Partial, gby=[c3@1 as c3],
aggr=[min(aggregate_test_100.c1)]
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-11)--------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c3], file_type=csv, has_header=true
+11)--------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]},
projection=[c1, c3], file_type=csv, has_header=true
#
@@ -5574,7 +5586,7 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3],
file_type=csv, has_header=true
+06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]},
projection=[c3], file_type=csv, has_header=true
query I
SELECT DISTINCT c3 FROM aggregate_test_100 group by c3 order by c3 limit 5;
@@ -5598,7 +5610,7 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[],
lim=[9]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2,
c3], file_type=csv, has_header=true
+06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]},
projection=[c2, c3], file_type=csv, has_header=true
query II
SELECT c2, c3 FROM aggregate_test_100 group by c2, c3 order by c2, c3 limit 5
offset 4;
@@ -5633,7 +5645,7 @@ physical_plan
10)------------------CoalesceBatchesExec: target_batch_size=8192
11)--------------------FilterExec: c3@1 >= 10 AND c3@1 <= 20
12)----------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-13)------------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2,
c3], file_type=csv, has_header=true
+13)------------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]},
projection=[c2, c3], file_type=csv, has_header=true
query I
SELECT DISTINCT c3 FROM aggregate_test_100 WHERE c3 between 10 and 20 group by
c3 order by c3 limit 4;
@@ -5659,7 +5671,7 @@ physical_plan
04)------CoalescePartitionsExec
05)--------AggregateExec: mode=Partial, gby=[c2@1 as c2, c3@2 as c3],
aggr=[max(aggregate_test_100.c1)]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-07)------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c2, c3], file_type=csv, has_header=true
+07)------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]},
projection=[c1, c2, c3], file_type=csv, has_header=true
# TODO(msirek): Extend checking in LimitedDistinctAggregation equal groupings
to ignore the order of columns
# in the group-by column lists, so the limit could be pushed to the lowest
AggregateExec in this case
@@ -5683,7 +5695,7 @@ physical_plan
08)--------------CoalescePartitionsExec
09)----------------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3],
aggr=[]
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-11)--------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2,
c3], file_type=csv, has_header=true
+11)--------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]},
projection=[c2, c3], file_type=csv, has_header=true
query II
SELECT DISTINCT c3, c2 FROM aggregate_test_100 group by c3, c2 order by c3, c2
limit 3 offset 10;
@@ -5707,7 +5719,7 @@ physical_plan
04)------CoalescePartitionsExec
05)--------AggregateExec: mode=Partial, gby=[(NULL as c2, NULL as c3), (c2@0
as c2, NULL as c3), (c2@0 as c2, c3@1 as c3)], aggr=[]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-07)------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2,
c3], file_type=csv, has_header=true
+07)------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]},
projection=[c2, c3], file_type=csv, has_header=true
query II
SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) limit 3;
@@ -5734,7 +5746,7 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3],
file_type=csv, has_header=true
+06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]},
projection=[c3], file_type=csv, has_header=true
statement ok
set datafusion.optimizer.enable_distinct_aggregation_soft_limit = true;
@@ -6955,7 +6967,7 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[],
aggr=[count(aggregate_test_100.c5)]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5],
file_type=csv, has_header=true
+06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]},
projection=[c5], file_type=csv, has_header=true
statement count 0
drop table aggregate_test_100;
diff --git a/testing b/testing
index d2a1371230..0d60ccae40 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit d2a13712303498963395318a4eb42872e66aead7
+Subproject commit 0d60ccae40d0e8f2d22c15fafb01c5d4be8c63a6
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]