This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 011adc203e feat: support month and year interval for date_bin on
constant data (#5982)
011adc203e is described below
commit 011adc203e4a02a822a8f0008852b90fb7441264
Author: Nga Tran <[email protected]>
AuthorDate: Thu Apr 20 08:57:56 2023 -0400
feat: support month and year interval for date_bin on constant data (#5982)
* feat: support month and year interval for date_bin on constant data
* chore: update a test output
* refactor: address review comments and also fix a bug for origin that is
not midnight of the first date of the month
* feat: always create IntervalUnit::MonthDayNano for month interval instead
of DayTime
* chore: address review comment
* chore: remove duplicate test but on specified default origin. Also remove
default origin from the tests
* Add a few additional test cases and comment clarification
* chore: Apply suggestions from code review
Co-authored-by: Stuart Carnie <[email protected]>
* chore: use Stuart's interval function for enum and date_bin benckmark
* Revert "chore: use Stuart's interval function for enum and date_bin
benckmark"
This reverts commit b45d6e0eefdcafa57eb425ee23dca547f465cc10.
* chore: add stuart's new function back without benchmark
* test: add tests with ordigin on 29th date of the month
---------
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Stuart Carnie <[email protected]>
---
.../tests/sqllogictests/test_files/timestamps.slt | 261 ++++++++++++++++++++-
datafusion/expr/src/function.rs | 8 +-
.../physical-expr/src/datetime_expressions.rs | 120 ++++++++--
3 files changed, 363 insertions(+), 26 deletions(-)
diff --git a/datafusion/core/tests/sqllogictests/test_files/timestamps.slt
b/datafusion/core/tests/sqllogictests/test_files/timestamps.slt
index 07c42c377b..63a2ecd654 100644
--- a/datafusion/core/tests/sqllogictests/test_files/timestamps.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/timestamps.slt
@@ -421,7 +421,7 @@ SELECT DATE_BIN(INTERVAL '5 microseconds', TIMESTAMP
'2022-08-03 14:38:50.000006
2022-08-03T14:38:50.000005
# Does not support months for Month-Day-Nano interval
-statement error This feature is not implemented: DATE_BIN stride does not
support month intervals
+statement error DataFusion error: This feature is not implemented: DATE_BIN
stride does not support combination of month, day and nanosecond intervals
SELECT DATE_BIN(INTERVAL '1 month 5 nanoseconds', TIMESTAMP '2022-08-03
14:38:50.000000006Z', TIMESTAMP '1970-01-01T00:00:00Z')
# Can coerce string interval arguments
@@ -500,6 +500,265 @@ FROM (
(TIMESTAMP '2021-06-10 17:19:10Z', TIMESTAMP '2001-01-01T00:00:00Z', 0.3)
) as t (time, origin, val)
+
+# month interval with INTERVAL keyword in date_bin with default start time
+query P
+select date_bin(INTERVAL '1 month', column1)
+from (values
+ (timestamp '2022-01-01 00:00:00'),
+ (timestamp '2022-01-01 01:00:00'),
+ (timestamp '2022-01-02 00:00:00'),
+ (timestamp '2022-02-02 00:00:00'),
+ (timestamp '2022-02-15 00:00:00'),
+ (timestamp '2022-03-31 00:00:00')
+) as sq
+----
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-02-01T00:00:00
+2022-02-01T00:00:00
+2022-03-01T00:00:00
+
+
+# year interval in date_bin with default start time
+query P
+select date_bin(INTERVAL '1 year', column1)
+from (values
+ (timestamp '2022-01-01 00:00:00'),
+ (timestamp '2023-01-01 01:00:00'),
+ (timestamp '2022-01-02 00:00:00'),
+ (timestamp '2022-02-02 00:00:00'),
+ (timestamp '2022-02-15 00:00:00'),
+ (timestamp '2022-03-31 00:00:00')
+) as sq
+----
+2022-01-01T00:00:00
+2023-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+
+query P
+SELECT DATE_BIN('1 month', '2022-01-01 00:00:00Z', '1970-01-01T00:00:00Z');
+----
+2022-01-01T00:00:00
+
+
+# Tests without INTERVAL keyword
+# 1-month interval in date_bin with default start time
+query P
+select date_bin('1 month', column1)
+from (values
+ (timestamp '2022-01-01 00:00:00'),
+ (timestamp '2022-01-01 01:00:00'),
+ (timestamp '2022-01-02 00:00:00'),
+ (timestamp '2022-02-02 00:00:00'),
+ (timestamp '2022-02-15 00:00:00'),
+ (timestamp '2022-03-31 00:00:00')
+) as sq
+----
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-02-01T00:00:00
+2022-02-01T00:00:00
+2022-03-01T00:00:00
+
+# 2-month interval in date_bin with default start time
+query P
+select date_bin('2 month', column1)
+from (values
+ (timestamp '2022-01-01 00:00:00'),
+ (timestamp '2022-01-01 01:00:00'),
+ (timestamp '2022-01-02 00:00:00'),
+ (timestamp '2022-02-02 00:00:00'),
+ (timestamp '2022-02-15 00:00:00'),
+ (timestamp '2022-03-31 00:00:00')
+) as sq
+----
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-03-01T00:00:00
+
+
+# month interval with start date end of the month plus some minutes
+#
+# The the return of `date_bin` is the start of the bin. The bin width is one
year.
+# The source data must be inside the bin.
+# Since the origin is '1970-12-31T00:15:00Z', the start of the bins are
+# '1970-12-31T00:15:00Z',
+# '1971-12-31T00:15:00Z',
+# ...,
+# '2021-12-31T00:15:00Z',
+# '2022-12-31T00:15:00Z',
+# ...
+#
+# Note the datetime '2022-03-31 00:00:00'. Its bin is NOT '2022-03-31
00:15:00' which is after its time
+# Its bin is '2022-02-28T00:15:00'
+#
+query P
+select date_bin('1 month', column1, '1970-12-31T00:15:00Z')
+from (values
+ (timestamp '2022-01-01 00:00:00'),
+ (timestamp '2022-01-01 01:00:00'),
+ (timestamp '2022-01-02 00:00:00'),
+ (timestamp '2022-02-02 00:00:00'),
+ (timestamp '2022-02-15 00:00:00'),
+ (timestamp '2022-03-31 00:00:00')
+) as sq
+----
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2022-01-31T00:15:00
+2022-01-31T00:15:00
+2022-02-28T00:15:00
+
+# month interval with start date is end of the month plus some minutes
+query P
+select date_bin('2 months', column1, '1970-12-31T00:15:00Z')
+from (values
+ (timestamp '2022-01-01 00:00:00'),
+ (timestamp '2022-01-01 01:00:00'),
+ (timestamp '2022-01-02 00:00:00'),
+ (timestamp '2022-02-02 00:00:00'),
+ (timestamp '2022-02-15 00:00:00'),
+ (timestamp '2022-03-31 00:00:00')
+) as sq
+----
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2022-02-28T00:15:00
+
+# year interval in date_bin with default start time
+query P
+select date_bin('1 year', column1)
+from (values
+ (timestamp '2022-01-01 00:00:00'),
+ (timestamp '2022-01-01 01:00:00'),
+ (timestamp '2022-01-02 00:00:00'),
+ (timestamp '2022-02-02 00:00:00'),
+ (timestamp '2022-02-15 00:00:00'),
+ (timestamp '2022-03-31 00:00:00'),
+ (timestamp '2023-10-28 01:33:00')
+) as sq
+----
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2023-01-01T00:00:00
+
+# year interval with start date is end of the month plus some minutes
+query P
+select date_bin('1 year', column1, '1970-12-31T00:15:00Z')
+from (values
+ (timestamp '2022-01-01 00:00:00'),
+ (timestamp '2022-01-01 01:00:00'),
+ (timestamp '2022-01-02 00:00:00'),
+ (timestamp '2022-02-02 00:00:00'),
+ (timestamp '2022-02-15 00:00:00'),
+ (timestamp '2022-03-31 00:00:00'),
+ (timestamp '2023-03-31 00:00:00')
+) as sq
+----
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2022-12-31T00:15:00
+
+# month interval on constant
+query P
+SELECT DATE_BIN('1 month', '2022-01-01 00:00:00Z');
+----
+2022-01-01T00:00:00
+
+# five months interval on constant
+query P
+SELECT DATE_BIN('5 month', '2022-01-01T00:00:00Z');
+----
+2021-09-01T00:00:00
+
+# month interval with default start time
+query P
+SELECT DATE_BIN('1 month', '2022-01-01 00:00:00Z');
+----
+2022-01-01T00:00:00
+
+# origin on the first date but not at midnight
+query P
+SELECT DATE_BIN('1 month', '2022-04-01T00:00:00Z', '2021-05-01T00:04:00Z');
+----
+2022-03-01T00:04:00
+
+# origin is May 31 (last date of the month) to produce bin on Feb 28
+query P
+SELECT DATE_BIN('3 month', '2022-04-01T00:00:00Z', '2021-05-31T00:04:00Z');
+----
+2022-02-28T00:04:00
+
+# origin is on Feb 29 and interval is one month. The bins will be:
+# '2000-02-29T00:00:00'
+# '2000-01-29T00:00:00'
+# '1999-12-29T00:00:00'
+# ....
+# Reason: Even though 29 (or 28 for non-leap year) is the last date of Feb but
it
+# is not last date of other month. Months' chrono consider a month before or
after that
+# will land on the same 29th date.
+query P
+select date_bin('1 month', timestamp '2000-01-31T00:00:00', timestamp
'2000-02-29T00:00:00');
+----
+2000-01-29T00:00:00
+
+# similar for the origin March 29
+query P
+select date_bin('1 month', timestamp '2000-01-31T00:00:00', timestamp
'2000-03-29T00:00:00');
+----
+2000-01-29T00:00:00
+
+# any value of origin
+query P
+SELECT DATE_BIN('3 month', '2022-01-01T00:00:00Z', '2021-05-05T17:56:21Z');
+----
+2021-11-05T17:56:21
+
+# origin is later than source
+query P
+SELECT DATE_BIN('3 month', '2022-01-01T00:00:00Z', '2022-05-05T17:56:21Z');
+----
+2021-11-05T17:56:21
+
+# year interval on constant
+query P
+SELECT DATE_BIN('1 year', '2022-01-01 00:00:00Z');
+----
+2022-01-01T00:00:00
+
+# 3-year interval on constant
+query P
+SELECT DATE_BIN('3 year', '2022-01-01 00:00:00Z');
+----
+2021-01-01T00:00:00
+
+# 3 year 1 months = 37 months
+query P
+SELECT DATE_BIN('3 years 1 months', '2022-09-01 00:00:00Z');
+----
+2022-06-01T00:00:00
+
###
## test date_trunc function
###
diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs
index 37afd17956..236f803b9a 100644
--- a/datafusion/expr/src/function.rs
+++ b/datafusion/expr/src/function.rs
@@ -476,21 +476,21 @@ pub fn signature(fun: &BuiltinScalarFunction) ->
Signature {
BuiltinScalarFunction::DateBin => Signature::one_of(
vec![
TypeSignature::Exact(vec![
- DataType::Interval(IntervalUnit::DayTime),
+ DataType::Interval(IntervalUnit::MonthDayNano),
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
]),
TypeSignature::Exact(vec![
- DataType::Interval(IntervalUnit::MonthDayNano),
+ DataType::Interval(IntervalUnit::DayTime),
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
]),
TypeSignature::Exact(vec![
- DataType::Interval(IntervalUnit::DayTime),
+ DataType::Interval(IntervalUnit::MonthDayNano),
DataType::Timestamp(TimeUnit::Nanosecond, None),
]),
TypeSignature::Exact(vec![
- DataType::Interval(IntervalUnit::MonthDayNano),
+ DataType::Interval(IntervalUnit::DayTime),
DataType::Timestamp(TimeUnit::Nanosecond, None),
]),
],
diff --git a/datafusion/physical-expr/src/datetime_expressions.rs
b/datafusion/physical-expr/src/datetime_expressions.rs
index 6da65f6592..b128dcb163 100644
--- a/datafusion/physical-expr/src/datetime_expressions.rs
+++ b/datafusion/physical-expr/src/datetime_expressions.rs
@@ -33,7 +33,7 @@ use arrow::{
},
};
use chrono::prelude::*;
-use chrono::Duration;
+use chrono::{Duration, Months, NaiveDate};
use datafusion_common::cast::{
as_date32_array, as_date64_array, as_generic_string_array,
as_timestamp_microsecond_array, as_timestamp_millisecond_array,
@@ -333,19 +333,67 @@ pub fn date_trunc(args: &[ColumnarValue]) ->
Result<ColumnarValue> {
})
}
-fn date_bin_single(stride: i64, source: i64, origin: i64) -> i64 {
+// return time in nanoseconds that the source timestamp falls into based on
the stride and origin
+fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> i64
{
let time_diff = source - origin;
- // distance to bin
+
+ // distance from origin to bin
+ let time_delta = compute_distance(time_diff, stride_nanos);
+
+ origin + time_delta
+}
+
+// distance from origin to bin
+fn compute_distance(time_diff: i64, stride: i64) -> i64 {
let time_delta = time_diff - (time_diff % stride);
- let time_delta = if time_diff < 0 && stride > 1 {
+ if time_diff < 0 && stride > 1 {
// The origin is later than the source timestamp, round down to the
previous bin
time_delta - stride
} else {
time_delta
+ }
+}
+
+// return time in nanoseconds that the source timestamp falls into based on
the stride and origin
+fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) ->
i64 {
+ // convert source and origin to DateTime<Utc>
+ let source_date = to_utc_date_time(source);
+ let origin_date = to_utc_date_time(origin);
+
+ // calculate the number of months between the source and origin
+ let month_diff = (source_date.year() - origin_date.year()) * 12
+ + source_date.month() as i32
+ - origin_date.month() as i32;
+
+ // distance from origin to bin
+ let month_delta = compute_distance(month_diff as i64, stride_months);
+
+ let mut bin_time = if month_delta < 0 {
+ origin_date - Months::new(month_delta.unsigned_abs() as u32)
+ } else {
+ origin_date + Months::new(month_delta as u32)
};
- origin + time_delta
+ // If origin is not midnight of first date of the month, the bin_time may
be larger than the source
+ // In this case, we need to move back to previous bin
+ if bin_time > source_date {
+ let month_delta = month_delta - stride_months;
+ bin_time = if month_delta < 0 {
+ origin_date - Months::new(month_delta.unsigned_abs() as u32)
+ } else {
+ origin_date + Months::new(month_delta as u32)
+ };
+ }
+
+ bin_time.timestamp_nanos()
+}
+
+fn to_utc_date_time(nanos: i64) -> DateTime<Utc> {
+ let secs = nanos / 1_000_000_000;
+ let nsec = (nanos % 1_000_000_000) as u32;
+ let date = NaiveDateTime::from_timestamp_opt(secs, nsec).unwrap();
+ DateTime::<Utc>::from_utc(date, Utc)
}
/// DATE_BIN sql function
@@ -366,6 +414,26 @@ pub fn date_bin(args: &[ColumnarValue]) ->
Result<ColumnarValue> {
}
}
+enum Interval {
+ Nanoseconds(i64),
+ Months(i64),
+}
+
+impl Interval {
+ fn bin_fn(&self) -> (i64, fn(i64, i64, i64) -> i64) {
+ match self {
+ Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
+ Interval::Months(months) => (*months, date_bin_months_interval),
+ }
+ }
+}
+
+// Supported intervals:
+// 1. IntervalDayTime: this means that the stride is in days, hours, minutes,
seconds and milliseconds
+// We will assume month interval won't be converted into this type
+// TODO (my next PR): without `INTERVAL` keyword, the stride was converted
into ScalarValue::IntervalDayTime somwhere
+// for month interval. I need to find that and make it
ScalarValue::IntervalMonthDayNano instead
+// 2. IntervalMonthDayNano
fn date_bin_impl(
stride: &ColumnarValue,
array: &ColumnarValue,
@@ -376,8 +444,9 @@ fn date_bin_impl(
let (days, ms) = IntervalDayTimeType::to_parts(*v);
let nanos = (Duration::days(days as i64) +
Duration::milliseconds(ms as i64))
.num_nanoseconds();
+
match nanos {
- Some(v) => v,
+ Some(v) => Interval::Nanoseconds(v),
_ => {
return Err(DataFusionError::Execution(
"DATE_BIN stride argument is too large".to_string(),
@@ -387,19 +456,27 @@ fn date_bin_impl(
}
ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
+
+ // If interval is months, its origin must be midnight of first
date of the month
if months != 0 {
- return Err(DataFusionError::NotImplemented(
- "DATE_BIN stride does not support month
intervals".to_string(),
- ));
- }
- let nanos = (Duration::days(days as i64) +
Duration::nanoseconds(nanos))
- .num_nanoseconds();
- match nanos {
- Some(v) => v,
- _ => {
- return Err(DataFusionError::Execution(
- "DATE_BIN stride argument is too large".to_string(),
- ))
+ // Return error if days or nanos is not zero
+ if days != 0 || nanos != 0 {
+ return Err(DataFusionError::NotImplemented(
+ "DATE_BIN stride does not support combination of
month, day and nanosecond intervals".to_string(),
+ ));
+ } else {
+ Interval::Months(months as i64)
+ }
+ } else {
+ let nanos = (Duration::days(days as i64) +
Duration::nanoseconds(nanos))
+ .num_nanoseconds();
+ match nanos {
+ Some(v) => Interval::Nanoseconds(v),
+ _ => {
+ return Err(DataFusionError::Execution(
+ "DATE_BIN stride argument is too
large".to_string(),
+ ))
+ }
}
}
}
@@ -429,7 +506,8 @@ fn date_bin_impl(
)),
};
- let f = |x: Option<i64>| x.map(|x| date_bin_single(stride, x, origin));
+ let (stride, stride_fn) = stride.bin_fn();
+ let f = |x: Option<i64>| x.map(|x| stride_fn(stride, x, origin));
Ok(match array {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
@@ -823,7 +901,7 @@ mod tests {
let origin1 = string_to_timestamp_nanos(origin).unwrap();
let expected1 = string_to_timestamp_nanos(expected).unwrap();
- let result = date_bin_single(stride1, source1, origin1);
+ let result = date_bin_nanos_interval(stride1, source1,
origin1);
assert_eq!(result, expected1, "{source} = {expected}");
})
}
@@ -912,7 +990,7 @@ mod tests {
]);
assert_eq!(
res.err().unwrap().to_string(),
- "This feature is not implemented: DATE_BIN stride does not support
month intervals"
+ "This feature is not implemented: DATE_BIN stride does not support
combination of month, day and nanosecond intervals"
);
// origin: invalid type