This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 011adc203e feat: support month and year interval for date_bin on 
constant data (#5982)
011adc203e is described below

commit 011adc203e4a02a822a8f0008852b90fb7441264
Author: Nga Tran <[email protected]>
AuthorDate: Thu Apr 20 08:57:56 2023 -0400

    feat: support month and year interval for date_bin on constant data (#5982)
    
    * feat: support month and year interval for date_bin on constant data
    
    * chore: update a test output
    
    * refactor: address review comments and also fix a bug for origin that is 
not midnight of the first date of the month
    
    * feat: always create IntervalUnit::MonthDayNano for month interval instead 
of DayTime
    
    * chore: address review comment
    
    * chore: remove duplicate test but on specified default origin. Also remove 
default origin from the tests
    
    * Add a few additional test cases and comment clarification
    
    * chore: Apply suggestions from code review
    
    Co-authored-by: Stuart Carnie <[email protected]>
    
    * chore: use Stuart's interval function for enum and date_bin benckmark
    
    * Revert "chore: use Stuart's interval function for enum and date_bin 
benckmark"
    
    This reverts commit b45d6e0eefdcafa57eb425ee23dca547f465cc10.
    
    * chore: add stuart's new function back without benchmark
    
    * test: add tests with ordigin on 29th date of the month
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
    Co-authored-by: Stuart Carnie <[email protected]>
---
 .../tests/sqllogictests/test_files/timestamps.slt  | 261 ++++++++++++++++++++-
 datafusion/expr/src/function.rs                    |   8 +-
 .../physical-expr/src/datetime_expressions.rs      | 120 ++++++++--
 3 files changed, 363 insertions(+), 26 deletions(-)

diff --git a/datafusion/core/tests/sqllogictests/test_files/timestamps.slt 
b/datafusion/core/tests/sqllogictests/test_files/timestamps.slt
index 07c42c377b..63a2ecd654 100644
--- a/datafusion/core/tests/sqllogictests/test_files/timestamps.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/timestamps.slt
@@ -421,7 +421,7 @@ SELECT DATE_BIN(INTERVAL '5 microseconds', TIMESTAMP 
'2022-08-03 14:38:50.000006
 2022-08-03T14:38:50.000005
 
 # Does not support months for Month-Day-Nano interval
-statement error This feature is not implemented: DATE_BIN stride does not 
support month intervals
+statement error DataFusion error: This feature is not implemented: DATE_BIN 
stride does not support combination of month, day and nanosecond intervals
 SELECT DATE_BIN(INTERVAL '1 month 5 nanoseconds', TIMESTAMP '2022-08-03 
14:38:50.000000006Z', TIMESTAMP '1970-01-01T00:00:00Z')
 
 # Can coerce string interval arguments
@@ -500,6 +500,265 @@ FROM (
     (TIMESTAMP '2021-06-10 17:19:10Z', TIMESTAMP '2001-01-01T00:00:00Z', 0.3)
   ) as t (time, origin, val)
 
+
+# month interval with INTERVAL keyword in date_bin with default start time
+query P
+select date_bin(INTERVAL '1 month', column1)
+from (values
+  (timestamp '2022-01-01 00:00:00'),
+  (timestamp '2022-01-01 01:00:00'),
+  (timestamp '2022-01-02 00:00:00'),
+  (timestamp '2022-02-02 00:00:00'),
+  (timestamp '2022-02-15 00:00:00'),
+  (timestamp '2022-03-31 00:00:00')
+) as sq
+----
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-02-01T00:00:00
+2022-02-01T00:00:00
+2022-03-01T00:00:00
+
+
+# year interval in date_bin with default start time
+query P
+select date_bin(INTERVAL '1 year', column1)
+from (values
+  (timestamp '2022-01-01 00:00:00'),
+  (timestamp '2023-01-01 01:00:00'),
+  (timestamp '2022-01-02 00:00:00'),
+  (timestamp '2022-02-02 00:00:00'),
+  (timestamp '2022-02-15 00:00:00'),
+  (timestamp '2022-03-31 00:00:00')
+) as sq
+----
+2022-01-01T00:00:00
+2023-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+
+query P
+SELECT DATE_BIN('1 month', '2022-01-01 00:00:00Z', '1970-01-01T00:00:00Z');
+----
+2022-01-01T00:00:00
+
+
+# Tests without INTERVAL keyword
+# 1-month interval in date_bin with default start time
+query P
+select date_bin('1 month', column1)
+from (values
+  (timestamp '2022-01-01 00:00:00'),
+  (timestamp '2022-01-01 01:00:00'),
+  (timestamp '2022-01-02 00:00:00'),
+  (timestamp '2022-02-02 00:00:00'),
+  (timestamp '2022-02-15 00:00:00'),
+  (timestamp '2022-03-31 00:00:00')
+) as sq
+----
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-02-01T00:00:00
+2022-02-01T00:00:00
+2022-03-01T00:00:00
+
+# 2-month interval in date_bin with default start time
+query P
+select date_bin('2 month', column1)
+from (values
+  (timestamp '2022-01-01 00:00:00'),
+  (timestamp '2022-01-01 01:00:00'),
+  (timestamp '2022-01-02 00:00:00'),
+  (timestamp '2022-02-02 00:00:00'),
+  (timestamp '2022-02-15 00:00:00'),
+  (timestamp '2022-03-31 00:00:00')
+) as sq
+----
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-03-01T00:00:00
+
+
+# month interval with start date end of the month plus some minutes
+#
+# The the return of `date_bin` is the start of the bin. The bin width is one 
year.
+# The source data must be inside the bin.
+# Since the origin is '1970-12-31T00:15:00Z', the start of the bins are
+#   '1970-12-31T00:15:00Z',
+#   '1971-12-31T00:15:00Z',
+#   ...,
+#   '2021-12-31T00:15:00Z',
+#   '2022-12-31T00:15:00Z',
+#   ...
+#
+# Note the datetime '2022-03-31 00:00:00'. Its bin is NOT '2022-03-31 
00:15:00' which is after its time
+# Its bin is '2022-02-28T00:15:00'
+#
+query P
+select date_bin('1 month', column1, '1970-12-31T00:15:00Z')
+from (values
+  (timestamp '2022-01-01 00:00:00'),
+  (timestamp '2022-01-01 01:00:00'),
+  (timestamp '2022-01-02 00:00:00'),
+  (timestamp '2022-02-02 00:00:00'),
+  (timestamp '2022-02-15 00:00:00'),
+  (timestamp '2022-03-31 00:00:00')
+) as sq
+----
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2022-01-31T00:15:00
+2022-01-31T00:15:00
+2022-02-28T00:15:00
+
+# month interval with start date is end of the month plus some minutes
+query P
+select date_bin('2 months', column1, '1970-12-31T00:15:00Z')
+from (values
+  (timestamp '2022-01-01 00:00:00'),
+  (timestamp '2022-01-01 01:00:00'),
+  (timestamp '2022-01-02 00:00:00'),
+  (timestamp '2022-02-02 00:00:00'),
+  (timestamp '2022-02-15 00:00:00'),
+  (timestamp '2022-03-31 00:00:00')
+) as sq
+----
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2022-02-28T00:15:00
+
+# year interval in date_bin with default start time
+query P
+select date_bin('1 year', column1)
+from (values
+  (timestamp '2022-01-01 00:00:00'),
+  (timestamp '2022-01-01 01:00:00'),
+  (timestamp '2022-01-02 00:00:00'),
+  (timestamp '2022-02-02 00:00:00'),
+  (timestamp '2022-02-15 00:00:00'),
+  (timestamp '2022-03-31 00:00:00'),
+  (timestamp '2023-10-28 01:33:00')
+) as sq
+----
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2022-01-01T00:00:00
+2023-01-01T00:00:00
+
+# year interval with start date is end of the month plus some minutes
+query P
+select date_bin('1 year', column1, '1970-12-31T00:15:00Z')
+from (values
+  (timestamp '2022-01-01 00:00:00'),
+  (timestamp '2022-01-01 01:00:00'),
+  (timestamp '2022-01-02 00:00:00'),
+  (timestamp '2022-02-02 00:00:00'),
+  (timestamp '2022-02-15 00:00:00'),
+  (timestamp '2022-03-31 00:00:00'),
+  (timestamp '2023-03-31 00:00:00')
+) as sq
+----
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2021-12-31T00:15:00
+2022-12-31T00:15:00
+
+# month interval on constant
+query P
+SELECT DATE_BIN('1 month', '2022-01-01 00:00:00Z');
+----
+2022-01-01T00:00:00
+
+# five months interval on constant
+query P
+SELECT DATE_BIN('5 month', '2022-01-01T00:00:00Z');
+----
+2021-09-01T00:00:00
+
+# month interval with default start time
+query P
+SELECT DATE_BIN('1 month', '2022-01-01 00:00:00Z');
+----
+2022-01-01T00:00:00
+
+# origin on the first date but not at midnight
+query P
+SELECT DATE_BIN('1 month', '2022-04-01T00:00:00Z', '2021-05-01T00:04:00Z');
+----
+2022-03-01T00:04:00
+
+# origin is May 31 (last date of the month) to produce bin on Feb 28
+query P
+SELECT DATE_BIN('3 month', '2022-04-01T00:00:00Z', '2021-05-31T00:04:00Z');
+----
+2022-02-28T00:04:00
+
+# origin is on Feb 29 and interval is one month. The bins will be:
+# '2000-02-29T00:00:00'
+# '2000-01-29T00:00:00'
+# '1999-12-29T00:00:00'
+# ....
+# Reason: Even though 29 (or 28 for non-leap year) is the last date of Feb but 
it
+# is not last date of other month. Months' chrono consider a month before or 
after that
+# will land on the same 29th date.
+query P
+select date_bin('1 month', timestamp '2000-01-31T00:00:00', timestamp 
'2000-02-29T00:00:00');
+----
+2000-01-29T00:00:00
+
+# similar for the origin March 29
+query P
+select date_bin('1 month', timestamp '2000-01-31T00:00:00', timestamp 
'2000-03-29T00:00:00');
+----
+2000-01-29T00:00:00
+
+# any value of origin
+query P
+SELECT DATE_BIN('3 month', '2022-01-01T00:00:00Z', '2021-05-05T17:56:21Z');
+----
+2021-11-05T17:56:21
+
+# origin is later than source
+query P
+SELECT DATE_BIN('3 month', '2022-01-01T00:00:00Z', '2022-05-05T17:56:21Z');
+----
+2021-11-05T17:56:21
+
+# year interval on constant
+query P
+SELECT DATE_BIN('1 year', '2022-01-01 00:00:00Z');
+----
+2022-01-01T00:00:00
+
+# 3-year interval on constant
+query P
+SELECT DATE_BIN('3 year', '2022-01-01 00:00:00Z');
+----
+2021-01-01T00:00:00
+
+# 3 year 1 months = 37 months
+query P
+SELECT DATE_BIN('3 years 1 months', '2022-09-01 00:00:00Z');
+----
+2022-06-01T00:00:00
+
 ###
 ## test date_trunc function
 ###
diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs
index 37afd17956..236f803b9a 100644
--- a/datafusion/expr/src/function.rs
+++ b/datafusion/expr/src/function.rs
@@ -476,21 +476,21 @@ pub fn signature(fun: &BuiltinScalarFunction) -> 
Signature {
         BuiltinScalarFunction::DateBin => Signature::one_of(
             vec![
                 TypeSignature::Exact(vec![
-                    DataType::Interval(IntervalUnit::DayTime),
+                    DataType::Interval(IntervalUnit::MonthDayNano),
                     DataType::Timestamp(TimeUnit::Nanosecond, None),
                     DataType::Timestamp(TimeUnit::Nanosecond, None),
                 ]),
                 TypeSignature::Exact(vec![
-                    DataType::Interval(IntervalUnit::MonthDayNano),
+                    DataType::Interval(IntervalUnit::DayTime),
                     DataType::Timestamp(TimeUnit::Nanosecond, None),
                     DataType::Timestamp(TimeUnit::Nanosecond, None),
                 ]),
                 TypeSignature::Exact(vec![
-                    DataType::Interval(IntervalUnit::DayTime),
+                    DataType::Interval(IntervalUnit::MonthDayNano),
                     DataType::Timestamp(TimeUnit::Nanosecond, None),
                 ]),
                 TypeSignature::Exact(vec![
-                    DataType::Interval(IntervalUnit::MonthDayNano),
+                    DataType::Interval(IntervalUnit::DayTime),
                     DataType::Timestamp(TimeUnit::Nanosecond, None),
                 ]),
             ],
diff --git a/datafusion/physical-expr/src/datetime_expressions.rs 
b/datafusion/physical-expr/src/datetime_expressions.rs
index 6da65f6592..b128dcb163 100644
--- a/datafusion/physical-expr/src/datetime_expressions.rs
+++ b/datafusion/physical-expr/src/datetime_expressions.rs
@@ -33,7 +33,7 @@ use arrow::{
     },
 };
 use chrono::prelude::*;
-use chrono::Duration;
+use chrono::{Duration, Months, NaiveDate};
 use datafusion_common::cast::{
     as_date32_array, as_date64_array, as_generic_string_array,
     as_timestamp_microsecond_array, as_timestamp_millisecond_array,
@@ -333,19 +333,67 @@ pub fn date_trunc(args: &[ColumnarValue]) -> 
Result<ColumnarValue> {
     })
 }
 
-fn date_bin_single(stride: i64, source: i64, origin: i64) -> i64 {
+// return time in nanoseconds that the source timestamp falls into based on 
the stride and origin
+fn date_bin_nanos_interval(stride_nanos: i64, source: i64, origin: i64) -> i64 
{
     let time_diff = source - origin;
-    // distance to bin
+
+    // distance from origin to bin
+    let time_delta = compute_distance(time_diff, stride_nanos);
+
+    origin + time_delta
+}
+
+// distance from origin to bin
+fn compute_distance(time_diff: i64, stride: i64) -> i64 {
     let time_delta = time_diff - (time_diff % stride);
 
-    let time_delta = if time_diff < 0 && stride > 1 {
+    if time_diff < 0 && stride > 1 {
         // The origin is later than the source timestamp, round down to the 
previous bin
         time_delta - stride
     } else {
         time_delta
+    }
+}
+
+// return time in nanoseconds that the source timestamp falls into based on 
the stride and origin
+fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> 
i64 {
+    // convert source and origin to DateTime<Utc>
+    let source_date = to_utc_date_time(source);
+    let origin_date = to_utc_date_time(origin);
+
+    // calculate the number of months between the source and origin
+    let month_diff = (source_date.year() - origin_date.year()) * 12
+        + source_date.month() as i32
+        - origin_date.month() as i32;
+
+    // distance from origin to bin
+    let month_delta = compute_distance(month_diff as i64, stride_months);
+
+    let mut bin_time = if month_delta < 0 {
+        origin_date - Months::new(month_delta.unsigned_abs() as u32)
+    } else {
+        origin_date + Months::new(month_delta as u32)
     };
 
-    origin + time_delta
+    // If origin is not midnight of first date of the month, the bin_time may 
be larger than the source
+    // In this case, we need to move back to previous bin
+    if bin_time > source_date {
+        let month_delta = month_delta - stride_months;
+        bin_time = if month_delta < 0 {
+            origin_date - Months::new(month_delta.unsigned_abs() as u32)
+        } else {
+            origin_date + Months::new(month_delta as u32)
+        };
+    }
+
+    bin_time.timestamp_nanos()
+}
+
+fn to_utc_date_time(nanos: i64) -> DateTime<Utc> {
+    let secs = nanos / 1_000_000_000;
+    let nsec = (nanos % 1_000_000_000) as u32;
+    let date = NaiveDateTime::from_timestamp_opt(secs, nsec).unwrap();
+    DateTime::<Utc>::from_utc(date, Utc)
 }
 
 /// DATE_BIN sql function
@@ -366,6 +414,26 @@ pub fn date_bin(args: &[ColumnarValue]) -> 
Result<ColumnarValue> {
     }
 }
 
+enum Interval {
+    Nanoseconds(i64),
+    Months(i64),
+}
+
+impl Interval {
+    fn bin_fn(&self) -> (i64, fn(i64, i64, i64) -> i64) {
+        match self {
+            Interval::Nanoseconds(nanos) => (*nanos, date_bin_nanos_interval),
+            Interval::Months(months) => (*months, date_bin_months_interval),
+        }
+    }
+}
+
+// Supported intervals:
+//  1. IntervalDayTime: this means that the stride is in days, hours, minutes, 
seconds and milliseconds
+//     We will assume month interval won't be converted into this type
+//     TODO (my next PR): without `INTERVAL` keyword, the stride was converted 
into ScalarValue::IntervalDayTime somwhere
+//             for month interval. I need to find that and make it 
ScalarValue::IntervalMonthDayNano instead
+// 2. IntervalMonthDayNano
 fn date_bin_impl(
     stride: &ColumnarValue,
     array: &ColumnarValue,
@@ -376,8 +444,9 @@ fn date_bin_impl(
             let (days, ms) = IntervalDayTimeType::to_parts(*v);
             let nanos = (Duration::days(days as i64) + 
Duration::milliseconds(ms as i64))
                 .num_nanoseconds();
+
             match nanos {
-                Some(v) => v,
+                Some(v) => Interval::Nanoseconds(v),
                 _ => {
                     return Err(DataFusionError::Execution(
                         "DATE_BIN stride argument is too large".to_string(),
@@ -387,19 +456,27 @@ fn date_bin_impl(
         }
         ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(v))) => {
             let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(*v);
+
+            // If interval is months, its origin must be midnight of first 
date of the month
             if months != 0 {
-                return Err(DataFusionError::NotImplemented(
-                    "DATE_BIN stride does not support month 
intervals".to_string(),
-                ));
-            }
-            let nanos = (Duration::days(days as i64) + 
Duration::nanoseconds(nanos))
-                .num_nanoseconds();
-            match nanos {
-                Some(v) => v,
-                _ => {
-                    return Err(DataFusionError::Execution(
-                        "DATE_BIN stride argument is too large".to_string(),
-                    ))
+                // Return error if days or nanos is not zero
+                if days != 0 || nanos != 0 {
+                    return Err(DataFusionError::NotImplemented(
+                        "DATE_BIN stride does not support combination of 
month, day and nanosecond intervals".to_string(),
+                    ));
+                } else {
+                    Interval::Months(months as i64)
+                }
+            } else {
+                let nanos = (Duration::days(days as i64) + 
Duration::nanoseconds(nanos))
+                    .num_nanoseconds();
+                match nanos {
+                    Some(v) => Interval::Nanoseconds(v),
+                    _ => {
+                        return Err(DataFusionError::Execution(
+                            "DATE_BIN stride argument is too 
large".to_string(),
+                        ))
+                    }
                 }
             }
         }
@@ -429,7 +506,8 @@ fn date_bin_impl(
         )),
     };
 
-    let f = |x: Option<i64>| x.map(|x| date_bin_single(stride, x, origin));
+    let (stride, stride_fn) = stride.bin_fn();
+    let f = |x: Option<i64>| x.map(|x| stride_fn(stride, x, origin));
 
     Ok(match array {
         ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
@@ -823,7 +901,7 @@ mod tests {
                 let origin1 = string_to_timestamp_nanos(origin).unwrap();
 
                 let expected1 = string_to_timestamp_nanos(expected).unwrap();
-                let result = date_bin_single(stride1, source1, origin1);
+                let result = date_bin_nanos_interval(stride1, source1, 
origin1);
                 assert_eq!(result, expected1, "{source} = {expected}");
             })
     }
@@ -912,7 +990,7 @@ mod tests {
         ]);
         assert_eq!(
             res.err().unwrap().to_string(),
-            "This feature is not implemented: DATE_BIN stride does not support 
month intervals"
+            "This feature is not implemented: DATE_BIN stride does not support 
combination of month, day and nanosecond intervals"
         );
 
         // origin: invalid type

Reply via email to