This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a63c871f Add support for month & year intervals (#2797)
5a63c871f is described below

commit 5a63c871fe2b426f2c513d5553aaa04819eb833d
Author: Brent Gardner <[email protected]>
AuthorDate: Tue Jul 12 09:46:51 2022 -0600

    Add support for month & year intervals (#2797)
---
 datafusion/common/src/scalar.rs                    |  15 +-
 datafusion/core/tests/sql/timestamp.rs             | 120 ++++++
 datafusion/optimizer/src/simplify_expressions.rs   |   8 +-
 .../physical-expr/src/expressions/datetime.rs      | 424 +++++++++++++++++----
 datafusion/physical-expr/src/expressions/delta.rs  | 182 +++++++++
 datafusion/physical-expr/src/expressions/mod.rs    |   1 +
 6 files changed, 673 insertions(+), 77 deletions(-)

diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 76d94677c..25bb96a40 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -38,6 +38,8 @@ use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc};
 
 /// Represents a dynamically typed, nullable single value.
 /// This is the single-valued counter-part of arrow’s `Array`.
+/// https://arrow.apache.org/docs/python/api/datatypes.html
+/// https://github.com/apache/arrow/blob/master/format/Schema.fbs#L354-L375
 #[derive(Clone)]
 pub enum ScalarValue {
     /// represents `DataType::Null` (castable to/from any other type)
@@ -76,9 +78,9 @@ pub enum ScalarValue {
     LargeBinary(Option<Vec<u8>>),
     /// list of nested ScalarValue
     List(Option<Vec<ScalarValue>>, Box<DataType>),
-    /// Date stored as a signed 32bit int
+    /// Date stored as a signed 32bit int days since UNIX epoch 1970-01-01
     Date32(Option<i32>),
-    /// Date stored as a signed 64bit int
+    /// Date stored as a signed 64bit int milliseconds since UNIX epoch 
1970-01-01
     Date64(Option<i64>),
     /// Timestamp Second
     TimestampSecond(Option<i64>, Option<String>),
@@ -88,11 +90,14 @@ pub enum ScalarValue {
     TimestampMicrosecond(Option<i64>, Option<String>),
     /// Timestamp Nanoseconds
     TimestampNanosecond(Option<i64>, Option<String>),
-    /// Interval with YearMonth unit
+    /// Number of elapsed whole months
     IntervalYearMonth(Option<i32>),
-    /// Interval with DayTime unit
+    /// Number of elapsed days and milliseconds (no leap seconds)
+    /// stored as 2 contiguous 32-bit signed integers
     IntervalDayTime(Option<i64>),
-    /// Interval with MonthDayNano unit
+    /// A triple of the number of elapsed months, days, and nanoseconds.
+    /// Months and days are encoded as 32-bit signed integers.
+    /// Nanoseconds is encoded as a 64-bit signed integer (no leap seconds).
     IntervalMonthDayNano(Option<i128>),
     /// struct of nested ScalarValue
     Struct(Option<Vec<ScalarValue>>, Box<Vec<Field>>),
diff --git a/datafusion/core/tests/sql/timestamp.rs 
b/datafusion/core/tests/sql/timestamp.rs
index 1e475fb17..9acc3f3cb 100644
--- a/datafusion/core/tests/sql/timestamp.rs
+++ b/datafusion/core/tests/sql/timestamp.rs
@@ -814,3 +814,123 @@ async fn group_by_timestamp_millis() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn interval_year() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "select date '1994-01-01' + interval '1' year as date;";
+    let results = execute_to_batches(&ctx, sql).await;
+
+    let expected = vec![
+        "+------------+",
+        "| date       |",
+        "+------------+",
+        "| 1995-01-01 |",
+        "+------------+",
+    ];
+
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn add_interval_month() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "select date '1994-01-31' + interval '1' month as date;";
+    let results = execute_to_batches(&ctx, sql).await;
+
+    let expected = vec![
+        "+------------+",
+        "| date       |",
+        "+------------+",
+        "| 1994-02-28 |",
+        "+------------+",
+    ];
+
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn sub_interval_month() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "select date '1994-03-31' - interval '1' month as date;";
+    let results = execute_to_batches(&ctx, sql).await;
+
+    let expected = vec![
+        "+------------+",
+        "| date       |",
+        "+------------+",
+        "| 1994-02-28 |",
+        "+------------+",
+    ];
+
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn sub_month_wrap() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "select date '1994-01-15' - interval '1' month as date;";
+    let results = execute_to_batches(&ctx, sql).await;
+
+    let expected = vec![
+        "+------------+",
+        "| date       |",
+        "+------------+",
+        "| 1993-12-15 |",
+        "+------------+",
+    ];
+
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn add_interval_day() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "select date '1994-01-15' + interval '1' day as date;";
+    let results = execute_to_batches(&ctx, sql).await;
+
+    let expected = vec![
+        "+------------+",
+        "| date       |",
+        "+------------+",
+        "| 1994-01-16 |",
+        "+------------+",
+    ];
+
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn sub_interval_day() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "select date '1994-01-01' - interval '1' day as date;";
+    let results = execute_to_batches(&ctx, sql).await;
+
+    let expected = vec![
+        "+------------+",
+        "| date       |",
+        "+------------+",
+        "| 1993-12-31 |",
+        "+------------+",
+    ];
+
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
diff --git a/datafusion/optimizer/src/simplify_expressions.rs 
b/datafusion/optimizer/src/simplify_expressions.rs
index dd63da989..8974639ea 100644
--- a/datafusion/optimizer/src/simplify_expressions.rs
+++ b/datafusion/optimizer/src/simplify_expressions.rs
@@ -1946,7 +1946,7 @@ mod tests {
         let date_plus_interval_expr = to_timestamp_expr(ts_string)
             .cast_to(&DataType::Date32, schema)
             .unwrap()
-            + Expr::Literal(ScalarValue::IntervalDayTime(Some(123)));
+            + Expr::Literal(ScalarValue::IntervalDayTime(Some(123i64 << 32)));
 
         let plan = LogicalPlanBuilder::from(table_scan.clone())
             .project(vec![date_plus_interval_expr])
@@ -1958,10 +1958,10 @@ mod tests {
 
         // Note that constant folder runs and folds the entire
         // expression down to a single constant (true)
-        let expected = "Projection: Date32(\"18636\") AS 
CAST(totimestamp(Utf8(\"2020-09-08T12:05:00+00:00\")) AS Date32) + 
IntervalDayTime(\"123\")\
-            \n  TableScan: test";
+        let expected = r#"Projection: Date32("18636") AS 
CAST(totimestamp(Utf8("2020-09-08T12:05:00+00:00")) AS Date32) + 
IntervalDayTime("528280977408")
+  TableScan: test"#;
         let actual = get_optimized_plan_formatted(&plan, &time);
 
-        assert_eq!(expected, actual);
+        assert_eq!(actual, expected);
     }
 }
diff --git a/datafusion/physical-expr/src/expressions/datetime.rs 
b/datafusion/physical-expr/src/expressions/datetime.rs
index 3d84e79f2..d4486a3ff 100644
--- a/datafusion/physical-expr/src/expressions/datetime.rs
+++ b/datafusion/physical-expr/src/expressions/datetime.rs
@@ -15,14 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::expressions::delta::shift_months;
 use crate::PhysicalExpr;
 use arrow::datatypes::{DataType, Schema};
 use arrow::record_batch::RecordBatch;
+use chrono::{Duration, NaiveDate};
 use datafusion_common::Result;
 use datafusion_common::{DataFusionError, ScalarValue};
 use datafusion_expr::{ColumnarValue, Operator};
 use std::any::Any;
 use std::fmt::{Display, Formatter};
+use std::ops::{Add, Sub};
 use std::sync::Arc;
 
 /// Perform DATE +/ INTERVAL math
@@ -74,88 +77,373 @@ impl PhysicalExpr for DateIntervalExpr {
         self
     }
 
-    fn data_type(&self, input_schema: &Schema) -> 
datafusion_common::Result<DataType> {
+    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
         self.lhs.data_type(input_schema)
     }
 
-    fn nullable(&self, input_schema: &Schema) -> 
datafusion_common::Result<bool> {
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
         self.lhs.nullable(input_schema)
     }
 
-    fn evaluate(&self, batch: &RecordBatch) -> 
datafusion_common::Result<ColumnarValue> {
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
         let dates = self.lhs.evaluate(batch)?;
         let intervals = self.rhs.evaluate(batch)?;
 
-        let interval = match intervals {
-            ColumnarValue::Scalar(interval) => match interval {
-                ScalarValue::IntervalDayTime(Some(interval)) => interval as 
i32,
-                ScalarValue::IntervalYearMonth(Some(_)) => {
-                    return Err(DataFusionError::Execution(
-                        "DateIntervalExpr does not support 
IntervalYearMonth".to_string(),
-                    ))
-                }
-                ScalarValue::IntervalMonthDayNano(Some(_)) => {
-                    return Err(DataFusionError::Execution(
-                        "DateIntervalExpr does not support 
IntervalMonthDayNano"
-                            .to_string(),
-                    ))
-                }
-                other => {
-                    return Err(DataFusionError::Execution(format!(
-                        "DateIntervalExpr does not support non-interval type 
{:?}",
-                        other
-                    )))
-                }
-            },
-            _ => {
-                return Err(DataFusionError::Execution(
-                    "Columnar execution is not yet supported for 
DateIntervalExpr"
-                        .to_string(),
-                ))
-            }
+        // Unwrap days since epoch
+        let operand = match dates {
+            ColumnarValue::Scalar(scalar) => scalar,
+            _ => Err(DataFusionError::Execution(
+                "Columnar execution is not yet supported for DateIntervalExpr"
+                    .to_string(),
+            ))?,
         };
 
-        match dates {
-            ColumnarValue::Scalar(scalar) => match scalar {
-                ScalarValue::Date32(Some(date)) => match &self.op {
-                    Operator::Plus => 
Ok(ColumnarValue::Scalar(ScalarValue::Date32(
-                        Some(date + interval),
-                    ))),
-                    Operator::Minus => 
Ok(ColumnarValue::Scalar(ScalarValue::Date32(
-                        Some(date - interval),
-                    ))),
-                    _ => {
-                        // this should be unreachable because we check the 
operators in `try_new`
-                        Err(DataFusionError::Execution(
-                            "Invalid operator for 
DateIntervalExpr".to_string(),
-                        ))
-                    }
-                },
-                ScalarValue::Date64(Some(date)) => match &self.op {
-                    Operator::Plus => 
Ok(ColumnarValue::Scalar(ScalarValue::Date64(
-                        Some(date + interval as i64),
-                    ))),
-                    Operator::Minus => 
Ok(ColumnarValue::Scalar(ScalarValue::Date64(
-                        Some(date - interval as i64),
-                    ))),
-                    _ => {
-                        // this should be unreachable because we check the 
operators in `try_new`
-                        Err(DataFusionError::Execution(
-                            "Invalid operator for 
DateIntervalExpr".to_string(),
-                        ))
-                    }
-                },
-                _ => {
-                    // this should be unreachable because we check the types 
in `try_new`
-                    Err(DataFusionError::Execution(
-                        "Invalid lhs type for DateIntervalExpr".to_string(),
-                    ))
-                }
-            },
+        // Convert to NaiveDate
+        let epoch = NaiveDate::from_ymd(1970, 1, 1);
+        let prior = match operand {
+            ScalarValue::Date32(Some(d)) => epoch.add(Duration::days(d as 
i64)),
+            ScalarValue::Date64(Some(ms)) => 
epoch.add(Duration::milliseconds(ms)),
+            _ => Err(DataFusionError::Execution(format!(
+                "Invalid lhs type for DateIntervalExpr: {:?}",
+                operand
+            )))?,
+        };
+
+        // Unwrap interval to add
+        let scalar = match &intervals {
+            ColumnarValue::Scalar(interval) => interval,
             _ => Err(DataFusionError::Execution(
                 "Columnar execution is not yet supported for DateIntervalExpr"
                     .to_string(),
-            )),
+            ))?,
+        };
+
+        // Invert sign for subtraction
+        let sign = match &self.op {
+            Operator::Plus => 1,
+            Operator::Minus => -1,
+            _ => {
+                // this should be unreachable because we check the operators 
in `try_new`
+                Err(DataFusionError::Execution(
+                    "Invalid operator for DateIntervalExpr".to_string(),
+                ))?
+            }
+        };
+
+        // Do math
+        let posterior = match scalar {
+            ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, 
sign),
+            ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i 
* sign),
+            ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, 
*i, sign),
+            other => Err(DataFusionError::Execution(format!(
+                "DateIntervalExpr does not support non-interval type {:?}",
+                other
+            )))?,
+        };
+
+        // convert back
+        let res = match operand {
+            ScalarValue::Date32(Some(_)) => {
+                let days = posterior.sub(epoch).num_days() as i32;
+                ColumnarValue::Scalar(ScalarValue::Date32(Some(days)))
+            }
+            ScalarValue::Date64(Some(_)) => {
+                let ms = posterior.sub(epoch).num_milliseconds();
+                ColumnarValue::Scalar(ScalarValue::Date64(Some(ms)))
+            }
+            _ => Err(DataFusionError::Execution(format!(
+                "Invalid lhs type for DateIntervalExpr: {}",
+                scalar
+            )))?,
+        };
+        Ok(res)
+    }
+}
+
+// Can remove once https://github.com/apache/arrow-rs/pull/2031 is released
+fn add_m_d_nano(prior: NaiveDate, interval: i128, sign: i32) -> NaiveDate {
+    let interval = interval as u128;
+    let nanos = (interval >> 64) as i64 * sign as i64;
+    let days = (interval >> 32) as i32 * sign;
+    let months = interval as i32 * sign;
+    let a = shift_months(prior, months);
+    let b = a.add(Duration::days(days as i64));
+    b.add(Duration::nanoseconds(nanos))
+}
+
+// Can remove once https://github.com/apache/arrow-rs/pull/2031 is released
+fn add_day_time(prior: NaiveDate, interval: i64, sign: i32) -> NaiveDate {
+    let interval = interval as u64;
+    let days = (interval >> 32) as i32 * sign;
+    let ms = interval as i32 * sign;
+    let intermediate = prior.add(Duration::days(days as i64));
+    intermediate.add(Duration::milliseconds(ms as i64))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::create_physical_expr;
+    use crate::execution_props::ExecutionProps;
+    use arrow::array::{ArrayRef, Date32Builder};
+    use arrow::datatypes::*;
+    use datafusion_common::{Result, ToDFSchema};
+    use datafusion_expr::Expr;
+
+    #[test]
+    fn add_11_months() {
+        let prior = NaiveDate::from_ymd(2000, 1, 1);
+        let actual = shift_months(prior, 11);
+        assert_eq!(format!("{:?}", actual).as_str(), "2000-12-01");
+    }
+
+    #[test]
+    fn add_12_months() {
+        let prior = NaiveDate::from_ymd(2000, 1, 1);
+        let actual = shift_months(prior, 12);
+        assert_eq!(format!("{:?}", actual).as_str(), "2001-01-01");
+    }
+
+    #[test]
+    fn add_13_months() {
+        let prior = NaiveDate::from_ymd(2000, 1, 1);
+        let actual = shift_months(prior, 13);
+        assert_eq!(format!("{:?}", actual).as_str(), "2001-02-01");
+    }
+
+    #[test]
+    fn sub_11_months() {
+        let prior = NaiveDate::from_ymd(2000, 1, 1);
+        let actual = shift_months(prior, -11);
+        assert_eq!(format!("{:?}", actual).as_str(), "1999-02-01");
+    }
+
+    #[test]
+    fn sub_12_months() {
+        let prior = NaiveDate::from_ymd(2000, 1, 1);
+        let actual = shift_months(prior, -12);
+        assert_eq!(format!("{:?}", actual).as_str(), "1999-01-01");
+    }
+
+    #[test]
+    fn sub_13_months() {
+        let prior = NaiveDate::from_ymd(2000, 1, 1);
+        let actual = shift_months(prior, -13);
+        assert_eq!(format!("{:?}", actual).as_str(), "1998-12-01");
+    }
+
+    #[test]
+    fn add_32_day_time() -> Result<()> {
+        // setup
+        let dt = Expr::Literal(ScalarValue::Date32(Some(0)));
+        let op = Operator::Plus;
+        let interval = create_day_time(1, 0);
+        let interval = 
Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+
+        // exercise
+        let res = exercise(&dt, op, &interval)?;
+
+        // assert
+        match res {
+            ColumnarValue::Scalar(ScalarValue::Date32(Some(d))) => {
+                let epoch = NaiveDate::from_ymd(1970, 1, 1);
+                let res = epoch.add(Duration::days(d as i64));
+                assert_eq!(format!("{:?}", res).as_str(), "1970-01-02");
+            }
+            _ => Err(DataFusionError::NotImplemented(
+                "Unexpected result!".to_string(),
+            ))?,
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn sub_32_year_month() -> Result<()> {
+        // setup
+        let dt = Expr::Literal(ScalarValue::Date32(Some(0)));
+        let op = Operator::Minus;
+        let interval = Expr::Literal(ScalarValue::IntervalYearMonth(Some(13)));
+
+        // exercise
+        let res = exercise(&dt, op, &interval)?;
+
+        // assert
+        match res {
+            ColumnarValue::Scalar(ScalarValue::Date32(Some(d))) => {
+                let epoch = NaiveDate::from_ymd(1970, 1, 1);
+                let res = epoch.add(Duration::days(d as i64));
+                assert_eq!(format!("{:?}", res).as_str(), "1968-12-01");
+            }
+            _ => Err(DataFusionError::NotImplemented(
+                "Unexpected result!".to_string(),
+            ))?,
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn add_64_day_time() -> Result<()> {
+        // setup
+        let dt = Expr::Literal(ScalarValue::Date64(Some(0)));
+        let op = Operator::Plus;
+        let interval = create_day_time(-15, -24 * 60 * 60 * 1000);
+        let interval = 
Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+
+        // exercise
+        let res = exercise(&dt, op, &interval)?;
+
+        // assert
+        match res {
+            ColumnarValue::Scalar(ScalarValue::Date64(Some(d))) => {
+                let epoch = NaiveDate::from_ymd(1970, 1, 1);
+                let res = epoch.add(Duration::milliseconds(d as i64));
+                assert_eq!(format!("{:?}", res).as_str(), "1969-12-16");
+            }
+            _ => Err(DataFusionError::NotImplemented(
+                "Unexpected result!".to_string(),
+            ))?,
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn add_32_year_month() -> Result<()> {
+        // setup
+        let dt = Expr::Literal(ScalarValue::Date32(Some(0)));
+        let op = Operator::Plus;
+        let interval = Expr::Literal(ScalarValue::IntervalYearMonth(Some(1)));
+
+        // exercise
+        let res = exercise(&dt, op, &interval)?;
+
+        // assert
+        match res {
+            ColumnarValue::Scalar(ScalarValue::Date32(Some(d))) => {
+                let epoch = NaiveDate::from_ymd(1970, 1, 1);
+                let res = epoch.add(Duration::days(d as i64));
+                assert_eq!(format!("{:?}", res).as_str(), "1970-02-01");
+            }
+            _ => Err(DataFusionError::NotImplemented(
+                "Unexpected result!".to_string(),
+            ))?,
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn add_32_month_day_nano() -> Result<()> {
+        // setup
+        let dt = Expr::Literal(ScalarValue::Date32(Some(0)));
+        let op = Operator::Plus;
+
+        let interval = create_month_day_nano(-12, -15, -42);
+
+        let interval = 
Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(interval)));
+
+        // exercise
+        let res = exercise(&dt, op, &interval)?;
+
+        // assert
+        match res {
+            ColumnarValue::Scalar(ScalarValue::Date32(Some(d))) => {
+                let epoch = NaiveDate::from_ymd(1970, 1, 1);
+                let res = epoch.add(Duration::days(d as i64));
+                assert_eq!(format!("{:?}", res).as_str(), "1968-12-17");
+            }
+            _ => Err(DataFusionError::NotImplemented(
+                "Unexpected result!".to_string(),
+            ))?,
         }
+
+        Ok(())
+    }
+
+    #[test]
+    fn invalid_interval() -> Result<()> {
+        // setup
+        let dt = Expr::Literal(ScalarValue::Date32(Some(0)));
+        let op = Operator::Plus;
+        let interval = Expr::Literal(ScalarValue::Null);
+
+        // exercise
+        let res = exercise(&dt, op, &interval);
+        assert!(res.is_err(), "Can't add a NULL interval");
+
+        Ok(())
+    }
+
+    #[test]
+    fn invalid_date() -> Result<()> {
+        // setup
+        let dt = Expr::Literal(ScalarValue::Null);
+        let op = Operator::Plus;
+        let interval = 
Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(0)));
+
+        // exercise
+        let res = exercise(&dt, op, &interval);
+        assert!(res.is_err(), "Can't add to NULL date");
+
+        Ok(())
+    }
+
+    #[test]
+    fn invalid_op() -> Result<()> {
+        // setup
+        let dt = Expr::Literal(ScalarValue::Date32(Some(0)));
+        let op = Operator::Eq;
+        let interval = 
Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(0)));
+
+        // exercise
+        let res = exercise(&dt, op, &interval);
+        assert!(res.is_err(), "Can't add dates with == operator");
+
+        Ok(())
+    }
+
+    fn exercise(dt: &Expr, op: Operator, interval: &Expr) -> 
Result<ColumnarValue> {
+        let mut builder = Date32Builder::new(1);
+        builder.append_value(0).unwrap();
+        let a: ArrayRef = Arc::new(builder.finish());
+        let schema = Schema::new(vec![Field::new("a", DataType::Date32, 
false)]);
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
+
+        let dfs = schema.clone().to_dfschema()?;
+        let props = ExecutionProps::new();
+
+        let lhs = create_physical_expr(dt, &dfs, &schema, &props)?;
+        let rhs = create_physical_expr(interval, &dfs, &schema, &props)?;
+
+        let cut = DateIntervalExpr::try_new(lhs, op, rhs, &schema)?;
+        let res = cut.evaluate(&batch)?;
+        Ok(res)
+    }
+
+    // Can remove once https://github.com/apache/arrow-rs/pull/2031 is released
+
+    /// Creates an IntervalDayTime given its constituent components
+    ///
+    /// 
https://github.com/apache/arrow-rs/blob/e59b023480437f67e84ba2f827b58f78fd44c3a1/integration-testing/src/lib.rs#L222
+    fn create_day_time(days: i32, millis: i32) -> i64 {
+        let m = millis as u64 & u32::MAX as u64;
+        let d = (days as u64 & u32::MAX as u64) << 32;
+        (m | d) as i64
+    }
+
+    // Can remove once https://github.com/apache/arrow-rs/pull/2031 is released
+    /// Creates an IntervalMonthDayNano given its constituent components
+    ///
+    /// Source: 
https://github.com/apache/arrow-rs/blob/e59b023480437f67e84ba2f827b58f78fd44c3a1/integration-testing/src/lib.rs#L340
+    ///     ((nanoseconds as i128) & 0xFFFFFFFFFFFFFFFF) << 64
+    ///     | ((days as i128) & 0xFFFFFFFF) << 32
+    ///     | ((months as i128) & 0xFFFFFFFF);
+    fn create_month_day_nano(months: i32, days: i32, nanos: i64) -> i128 {
+        let m = months as u128 & u32::MAX as u128;
+        let d = (days as u128 & u32::MAX as u128) << 32;
+        let n = (nanos as u128) << 64;
+        (m | d | n) as i128
     }
 }
diff --git a/datafusion/physical-expr/src/expressions/delta.rs 
b/datafusion/physical-expr/src/expressions/delta.rs
new file mode 100644
index 000000000..b7efdab0a
--- /dev/null
+++ b/datafusion/physical-expr/src/expressions/delta.rs
@@ -0,0 +1,182 @@
+// MIT License
+//
+// Copyright (c) 2020-2022 Oliver Margetts
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to 
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in 
all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 
THE
+// SOFTWARE.
+
+// Copied from chronoutil crate
+
+//! Contains utility functions for shifting Date objects.
+use chrono::Datelike;
+
+/// Returns true if the year is a leap-year, as naively defined in the 
Gregorian calendar.
+#[inline]
+pub(crate) fn is_leap_year(year: i32) -> bool {
+    year % 4 == 0 && (year % 100 != 0 || year % 400 == 0)
+}
+
+// If the day lies within the month, this function has no effect. Otherwise, 
it shifts
+// day backwards to the final day of the month.
+// XXX: No attempt is made to handle days outside the 1-31 range.
+#[inline]
+fn normalise_day(year: i32, month: u32, day: u32) -> u32 {
+    if day <= 28 {
+        day
+    } else if month == 2 {
+        28 + is_leap_year(year) as u32
+    } else if day == 31 && (month == 4 || month == 6 || month == 9 || month == 
11) {
+        30
+    } else {
+        day
+    }
+}
+
+/// Shift a date by the given number of months.
+/// Ambiguous month-ends are shifted backwards as necessary.
+pub(crate) fn shift_months<D: Datelike>(date: D, months: i32) -> D {
+    let mut year = date.year() + (date.month() as i32 + months) / 12;
+    let mut month = (date.month() as i32 + months) % 12;
+    let mut day = date.day();
+
+    if month < 1 {
+        year -= 1;
+        month += 12;
+    }
+
+    day = normalise_day(year, month as u32, day);
+
+    // This is slow but guaranteed to succeed (short of interger overflow)
+    if day <= 28 {
+        date.with_day(day)
+            .unwrap()
+            .with_month(month as u32)
+            .unwrap()
+            .with_year(year)
+            .unwrap()
+    } else {
+        date.with_day(1)
+            .unwrap()
+            .with_month(month as u32)
+            .unwrap()
+            .with_year(year)
+            .unwrap()
+            .with_day(day)
+            .unwrap()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashSet;
+
+    use chrono::naive::{NaiveDate, NaiveDateTime, NaiveTime};
+
+    use super::*;
+
+    #[test]
+    fn test_leap_year_cases() {
+        let _leap_years: Vec<i32> = vec![
+            1904, 1908, 1912, 1916, 1920, 1924, 1928, 1932, 1936, 1940, 1944, 
1948, 1952,
+            1956, 1960, 1964, 1968, 1972, 1976, 1980, 1984, 1988, 1992, 1996, 
2000, 2004,
+            2008, 2012, 2016, 2020,
+        ];
+        let leap_years_1900_to_2020: HashSet<i32> = 
_leap_years.into_iter().collect();
+
+        for year in 1900..2021 {
+            assert_eq!(is_leap_year(year), 
leap_years_1900_to_2020.contains(&year))
+        }
+    }
+
+    #[test]
+    fn test_shift_months() {
+        let base = NaiveDate::from_ymd(2020, 1, 31);
+
+        assert_eq!(shift_months(base, 0), NaiveDate::from_ymd(2020, 1, 31));
+        assert_eq!(shift_months(base, 1), NaiveDate::from_ymd(2020, 2, 29));
+        assert_eq!(shift_months(base, 2), NaiveDate::from_ymd(2020, 3, 31));
+        assert_eq!(shift_months(base, 3), NaiveDate::from_ymd(2020, 4, 30));
+        assert_eq!(shift_months(base, 4), NaiveDate::from_ymd(2020, 5, 31));
+        assert_eq!(shift_months(base, 5), NaiveDate::from_ymd(2020, 6, 30));
+        assert_eq!(shift_months(base, 6), NaiveDate::from_ymd(2020, 7, 31));
+        assert_eq!(shift_months(base, 7), NaiveDate::from_ymd(2020, 8, 31));
+        assert_eq!(shift_months(base, 8), NaiveDate::from_ymd(2020, 9, 30));
+        assert_eq!(shift_months(base, 9), NaiveDate::from_ymd(2020, 10, 31));
+        assert_eq!(shift_months(base, 10), NaiveDate::from_ymd(2020, 11, 30));
+        assert_eq!(shift_months(base, 11), NaiveDate::from_ymd(2020, 12, 31));
+        assert_eq!(shift_months(base, 12), NaiveDate::from_ymd(2021, 1, 31));
+        assert_eq!(shift_months(base, 13), NaiveDate::from_ymd(2021, 2, 28));
+
+        assert_eq!(shift_months(base, -1), NaiveDate::from_ymd(2019, 12, 31));
+        assert_eq!(shift_months(base, -2), NaiveDate::from_ymd(2019, 11, 30));
+        assert_eq!(shift_months(base, -3), NaiveDate::from_ymd(2019, 10, 31));
+        assert_eq!(shift_months(base, -4), NaiveDate::from_ymd(2019, 9, 30));
+        assert_eq!(shift_months(base, -5), NaiveDate::from_ymd(2019, 8, 31));
+        assert_eq!(shift_months(base, -6), NaiveDate::from_ymd(2019, 7, 31));
+        assert_eq!(shift_months(base, -7), NaiveDate::from_ymd(2019, 6, 30));
+        assert_eq!(shift_months(base, -8), NaiveDate::from_ymd(2019, 5, 31));
+        assert_eq!(shift_months(base, -9), NaiveDate::from_ymd(2019, 4, 30));
+        assert_eq!(shift_months(base, -10), NaiveDate::from_ymd(2019, 3, 31));
+        assert_eq!(shift_months(base, -11), NaiveDate::from_ymd(2019, 2, 28));
+        assert_eq!(shift_months(base, -12), NaiveDate::from_ymd(2019, 1, 31));
+        assert_eq!(shift_months(base, -13), NaiveDate::from_ymd(2018, 12, 31));
+
+        assert_eq!(shift_months(base, 1265), NaiveDate::from_ymd(2125, 6, 30));
+    }
+
+    #[test]
+    fn test_shift_months_with_overflow() {
+        let base = NaiveDate::from_ymd(2020, 12, 31);
+
+        assert_eq!(shift_months(base, 0), base);
+        assert_eq!(shift_months(base, 1), NaiveDate::from_ymd(2021, 1, 31));
+        assert_eq!(shift_months(base, 2), NaiveDate::from_ymd(2021, 2, 28));
+        assert_eq!(shift_months(base, 12), NaiveDate::from_ymd(2021, 12, 31));
+        assert_eq!(shift_months(base, 18), NaiveDate::from_ymd(2022, 6, 30));
+
+        assert_eq!(shift_months(base, -1), NaiveDate::from_ymd(2020, 11, 30));
+        assert_eq!(shift_months(base, -2), NaiveDate::from_ymd(2020, 10, 31));
+        assert_eq!(shift_months(base, -10), NaiveDate::from_ymd(2020, 2, 29));
+        assert_eq!(shift_months(base, -12), NaiveDate::from_ymd(2019, 12, 31));
+        assert_eq!(shift_months(base, -18), NaiveDate::from_ymd(2019, 6, 30));
+    }
+
+    #[test]
+    fn test_shift_months_datetime() {
+        let date = NaiveDate::from_ymd(2020, 1, 31);
+        let o_clock = NaiveTime::from_hms(1, 2, 3);
+
+        let base = NaiveDateTime::new(date, o_clock);
+
+        assert_eq!(
+            shift_months(base, 0).date(),
+            NaiveDate::from_ymd(2020, 1, 31)
+        );
+        assert_eq!(
+            shift_months(base, 1).date(),
+            NaiveDate::from_ymd(2020, 2, 29)
+        );
+        assert_eq!(
+            shift_months(base, 2).date(),
+            NaiveDate::from_ymd(2020, 3, 31)
+        );
+        assert_eq!(shift_months(base, 0).time(), o_clock);
+        assert_eq!(shift_months(base, 1).time(), o_clock);
+        assert_eq!(shift_months(base, 2).time(), o_clock);
+    }
+}
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index 8eb95f2bf..7a78f4603 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -23,6 +23,7 @@ mod case;
 mod cast;
 mod column;
 mod datetime;
+mod delta;
 mod get_indexed_field;
 mod in_list;
 mod is_not_null;

Reply via email to