This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ecc04d4af8 feat: Support faster multi-column grouping ( `GroupColumn`)
for `Date/Time/Timestamp` types (#13457)
ecc04d4af8 is described below
commit ecc04d4af85a29111a1598e615350fea84e60fcb
Author: Jonathan Chen <[email protected]>
AuthorDate: Wed Nov 20 06:59:21 2024 -0500
feat: Support faster multi-column grouping ( `GroupColumn`) for
`Date/Time/Timestamp` types (#13457)
* feat: Add `GroupColumn` for `Date/Time/Timestamp`
* Add tests
---
.../src/aggregates/group_values/mod.rs | 28 +++
.../aggregates/group_values/multi_group_by/mod.rs | 42 ++++-
datafusion/sqllogictest/test_files/group_by.slt | 196 +++++++++++++++++++++
3 files changed, 263 insertions(+), 3 deletions(-)
diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs
b/datafusion/physical-plan/src/aggregates/group_values/mod.rs
index a816203b68..ae528daad5 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs
@@ -18,7 +18,13 @@
//! [`GroupValues`] trait for storing and interning group keys
use arrow::record_batch::RecordBatch;
+use arrow_array::types::{
+ Date32Type, Date64Type, Time32MillisecondType, Time32SecondType,
+ Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType,
+ TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
+};
use arrow_array::{downcast_primitive, ArrayRef};
+use arrow_schema::TimeUnit;
use arrow_schema::{DataType, SchemaRef};
use datafusion_common::Result;
@@ -142,6 +148,28 @@ pub(crate) fn new_group_values(
}
match d {
+ DataType::Date32 => {
+ downcast_helper!(Date32Type, d);
+ }
+ DataType::Date64 => {
+ downcast_helper!(Date64Type, d);
+ }
+ DataType::Time32(t) => match t {
+ TimeUnit::Second => downcast_helper!(Time32SecondType, d),
+ TimeUnit::Millisecond =>
downcast_helper!(Time32MillisecondType, d),
+ _ => {}
+ },
+ DataType::Time64(t) => match t {
+ TimeUnit::Microsecond =>
downcast_helper!(Time64MicrosecondType, d),
+ TimeUnit::Nanosecond => downcast_helper!(Time64NanosecondType,
d),
+ _ => {}
+ },
+ DataType::Timestamp(t, _) => match t {
+ TimeUnit::Second => downcast_helper!(TimestampSecondType, d),
+ TimeUnit::Millisecond =>
downcast_helper!(TimestampMillisecondType, d),
+ TimeUnit::Microsecond =>
downcast_helper!(TimestampMicrosecondType, d),
+ TimeUnit::Nanosecond =>
downcast_helper!(TimestampNanosecondType, d),
+ },
DataType::Utf8 => {
return
Ok(Box::new(GroupValuesByes::<i32>::new(OutputType::Utf8)));
}
diff --git
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
index 83b0f9d773..10b00cf74f 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
@@ -32,12 +32,14 @@ use ahash::RandomState;
use arrow::compute::cast;
use arrow::datatypes::{
BinaryViewType, Date32Type, Date64Type, Float32Type, Float64Type,
Int16Type,
- Int32Type, Int64Type, Int8Type, StringViewType, UInt16Type, UInt32Type,
UInt64Type,
- UInt8Type,
+ Int32Type, Int64Type, Int8Type, StringViewType, Time32MillisecondType,
+ Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
+ TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType,
+ TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use arrow::record_batch::RecordBatch;
use arrow_array::{Array, ArrayRef};
-use arrow_schema::{DataType, Schema, SchemaRef};
+use arrow_schema::{DataType, Schema, SchemaRef, TimeUnit};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
@@ -913,6 +915,38 @@ impl<const STREAMING: bool> GroupValues for
GroupValuesColumn<STREAMING> {
}
&DataType::Date32 => instantiate_primitive!(v, nullable,
Date32Type),
&DataType::Date64 => instantiate_primitive!(v, nullable,
Date64Type),
+ &DataType::Time32(t) => match t {
+ TimeUnit::Second => {
+ instantiate_primitive!(v, nullable,
Time32SecondType)
+ }
+ TimeUnit::Millisecond => {
+ instantiate_primitive!(v, nullable,
Time32MillisecondType)
+ }
+ _ => {}
+ },
+ &DataType::Time64(t) => match t {
+ TimeUnit::Microsecond => {
+ instantiate_primitive!(v, nullable,
Time64MicrosecondType)
+ }
+ TimeUnit::Nanosecond => {
+ instantiate_primitive!(v, nullable,
Time64NanosecondType)
+ }
+ _ => {}
+ },
+ &DataType::Timestamp(t, _) => match t {
+ TimeUnit::Second => {
+ instantiate_primitive!(v, nullable,
TimestampSecondType)
+ }
+ TimeUnit::Millisecond => {
+ instantiate_primitive!(v, nullable,
TimestampMillisecondType)
+ }
+ TimeUnit::Microsecond => {
+ instantiate_primitive!(v, nullable,
TimestampMicrosecondType)
+ }
+ TimeUnit::Nanosecond => {
+ instantiate_primitive!(v, nullable,
TimestampNanosecondType)
+ }
+ },
&DataType::Utf8 => {
let b =
ByteGroupValueBuilder::<i32>::new(OutputType::Utf8);
v.push(Box::new(b) as _)
@@ -1125,6 +1159,8 @@ fn supported_type(data_type: &DataType) -> bool {
| DataType::LargeBinary
| DataType::Date32
| DataType::Date64
+ | DataType::Time32(_)
+ | DataType::Timestamp(_, _)
| DataType::Utf8View
| DataType::BinaryView
)
diff --git a/datafusion/sqllogictest/test_files/group_by.slt
b/datafusion/sqllogictest/test_files/group_by.slt
index 391f848368..f74e1006f7 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -5272,6 +5272,201 @@ drop view t
statement ok
drop table source;
+# Test multi group by int + Date32
+statement ok
+create table source as values
+(1, '2020-01-01'),
+(1, '2020-01-01'),
+(2, '2020-01-02'),
+(2, '2020-01-03'),
+(3, '2020-01-04'),
+(3, '2020-01-04'),
+(2, '2020-01-03'),
+(null, null),
+(null, '2020-01-01'),
+(null, null),
+(null, '2020-01-01'),
+(2, '2020-01-02'),
+(2, '2020-01-02'),
+(1, null)
+;
+
+statement ok
+create view t as select column1 as a, arrow_cast(column2, 'Date32') as b from
source;
+
+query IDI
+select a, b, count(*) from t group by a, b order by a, b;
+----
+1 2020-01-01 2
+1 NULL 1
+2 2020-01-02 3
+2 2020-01-03 2
+3 2020-01-04 2
+NULL 2020-01-01 2
+NULL NULL 2
+
+statement ok
+drop view t
+
+statement ok
+drop table source;
+
+# Test multi group by int + Date64
+statement ok
+create table source as values
+(1, '2020-01-01'),
+(1, '2020-01-01'),
+(2, '2020-01-02'),
+(2, '2020-01-03'),
+(3, '2020-01-04'),
+(3, '2020-01-04'),
+(2, '2020-01-03'),
+(null, null),
+(null, '2020-01-01'),
+(null, null),
+(null, '2020-01-01'),
+(2, '2020-01-02'),
+(2, '2020-01-02'),
+(1, null)
+;
+
+statement ok
+create view t as select column1 as a, arrow_cast(column2, 'Date64') as b from
source;
+
+query IDI
+select a, b, count(*) from t group by a, b order by a, b;
+----
+1 2020-01-01T00:00:00 2
+1 NULL 1
+2 2020-01-02T00:00:00 3
+2 2020-01-03T00:00:00 2
+3 2020-01-04T00:00:00 2
+NULL 2020-01-01T00:00:00 2
+NULL NULL 2
+
+statement ok
+drop view t
+
+statement ok
+drop table source;
+
+# Test multi group by int + Time32
+statement ok
+create table source as values
+(1, '12:34:56'),
+(1, '12:34:56'),
+(2, '13:00:00'),
+(2, '14:15:00'),
+(3, '23:59:59'),
+(3, '23:59:59'),
+(2, '14:15:00'),
+(null, null),
+(null, '12:00:00'),
+(null, null),
+(null, '12:00:00'),
+(2, '13:00:00'),
+(2, '13:00:00'),
+(1, null)
+;
+
+statement ok
+create view t as select column1 as a, arrow_cast(column2, 'Time32(Second)') as
b from source;
+
+query IDI
+select a, b, count(*) from t group by a, b order by a, b;
+----
+1 12:34:56 2
+1 NULL 1
+2 13:00:00 3
+2 14:15:00 2
+3 23:59:59 2
+NULL 12:00:00 2
+NULL NULL 2
+
+statement ok
+drop view t
+
+statement ok
+drop table source;
+
+# Test multi group by int + Time64
+statement ok
+create table source as values
+(1, '12:34:56.123456'),
+(1, '12:34:56.123456'),
+(2, '13:00:00.000001'),
+(2, '14:15:00.999999'),
+(3, '23:59:59.500000'),
+(3, '23:59:59.500000'),
+(2, '14:15:00.999999'),
+(null, null),
+(null, '12:00:00.000000'),
+(null, null),
+(null, '12:00:00.000000'),
+(2, '13:00:00.000001'),
+(2, '13:00:00.000001'),
+(1, null)
+;
+
+statement ok
+create view t as select column1 as a, arrow_cast(column2,
'Time64(Microsecond)') as b from source;
+
+query IDI
+select a, b, count(*) from t group by a, b order by a, b;
+----
+1 12:34:56.123456 2
+1 NULL 1
+2 13:00:00.000001 3
+2 14:15:00.999999 2
+3 23:59:59.500 2
+NULL 12:00:00 2
+NULL NULL 2
+
+statement ok
+drop view t
+
+statement ok
+drop table source;
+
+# Test multi group by int + Timestamp
+statement ok
+create table source as values
+(1, '2020-01-01 12:34:56'),
+(1, '2020-01-01 12:34:56'),
+(2, '2020-01-02 13:00:00'),
+(2, '2020-01-03 14:15:00'),
+(3, '2020-01-04 23:59:59'),
+(3, '2020-01-04 23:59:59'),
+(2, '2020-01-03 14:15:00'),
+(null, null),
+(null, '2020-01-01 12:00:00'),
+(null, null),
+(null, '2020-01-01 12:00:00'),
+(2, '2020-01-02 13:00:00'),
+(2, '2020-01-02 13:00:00'),
+(1, null)
+;
+
+statement ok
+create view t as select column1 as a, arrow_cast(column2,
'Timestamp(Nanosecond, None)') as b from source;
+
+query IPI
+select a, b, count(*) from t group by a, b order by a, b;
+----
+1 2020-01-01T12:34:56 2
+1 NULL 1
+2 2020-01-02T13:00:00 3
+2 2020-01-03T14:15:00 2
+3 2020-01-04T23:59:59 2
+NULL 2020-01-01T12:00:00 2
+NULL NULL 2
+
+statement ok
+drop view t
+
+statement ok
+drop table source;
+
# Test whether min, max accumulator produces NaN result when input is NaN.
# See https://github.com/apache/datafusion/issues/13415 for rationale
statement ok
@@ -5287,3 +5482,4 @@ query RR
SELECT max(input_table.x), min(input_table.x) from input_table GROUP BY
input_table."row";
----
NaN NaN
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]