This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 5a63c871f Add support for month & year intervals (#2797)
5a63c871f is described below
commit 5a63c871fe2b426f2c513d5553aaa04819eb833d
Author: Brent Gardner <[email protected]>
AuthorDate: Tue Jul 12 09:46:51 2022 -0600
Add support for month & year intervals (#2797)
---
datafusion/common/src/scalar.rs | 15 +-
datafusion/core/tests/sql/timestamp.rs | 120 ++++++
datafusion/optimizer/src/simplify_expressions.rs | 8 +-
.../physical-expr/src/expressions/datetime.rs | 424 +++++++++++++++++----
datafusion/physical-expr/src/expressions/delta.rs | 182 +++++++++
datafusion/physical-expr/src/expressions/mod.rs | 1 +
6 files changed, 673 insertions(+), 77 deletions(-)
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 76d94677c..25bb96a40 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -38,6 +38,8 @@ use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc};
/// Represents a dynamically typed, nullable single value.
/// This is the single-valued counter-part of arrow’s `Array`.
+/// https://arrow.apache.org/docs/python/api/datatypes.html
+/// https://github.com/apache/arrow/blob/master/format/Schema.fbs#L354-L375
#[derive(Clone)]
pub enum ScalarValue {
/// represents `DataType::Null` (castable to/from any other type)
@@ -76,9 +78,9 @@ pub enum ScalarValue {
LargeBinary(Option<Vec<u8>>),
/// list of nested ScalarValue
List(Option<Vec<ScalarValue>>, Box<DataType>),
- /// Date stored as a signed 32bit int
+ /// Date stored as a signed 32bit int days since UNIX epoch 1970-01-01
Date32(Option<i32>),
- /// Date stored as a signed 64bit int
+ /// Date stored as a signed 64bit int milliseconds since UNIX epoch
1970-01-01
Date64(Option<i64>),
/// Timestamp Second
TimestampSecond(Option<i64>, Option<String>),
@@ -88,11 +90,14 @@ pub enum ScalarValue {
TimestampMicrosecond(Option<i64>, Option<String>),
/// Timestamp Nanoseconds
TimestampNanosecond(Option<i64>, Option<String>),
- /// Interval with YearMonth unit
+ /// Number of elapsed whole months
IntervalYearMonth(Option<i32>),
- /// Interval with DayTime unit
+ /// Number of elapsed days and milliseconds (no leap seconds)
+ /// stored as 2 contiguous 32-bit signed integers
IntervalDayTime(Option<i64>),
- /// Interval with MonthDayNano unit
+ /// A triple of the number of elapsed months, days, and nanoseconds.
+ /// Months and days are encoded as 32-bit signed integers.
+ /// Nanoseconds is encoded as a 64-bit signed integer (no leap seconds).
IntervalMonthDayNano(Option<i128>),
/// struct of nested ScalarValue
Struct(Option<Vec<ScalarValue>>, Box<Vec<Field>>),
diff --git a/datafusion/core/tests/sql/timestamp.rs
b/datafusion/core/tests/sql/timestamp.rs
index 1e475fb17..9acc3f3cb 100644
--- a/datafusion/core/tests/sql/timestamp.rs
+++ b/datafusion/core/tests/sql/timestamp.rs
@@ -814,3 +814,123 @@ async fn group_by_timestamp_millis() -> Result<()> {
assert_batches_eq!(expected, &actual);
Ok(())
}
+
+#[tokio::test]
+async fn interval_year() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "select date '1994-01-01' + interval '1' year as date;";
+ let results = execute_to_batches(&ctx, sql).await;
+
+ let expected = vec![
+ "+------------+",
+ "| date |",
+ "+------------+",
+ "| 1995-01-01 |",
+ "+------------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn add_interval_month() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "select date '1994-01-31' + interval '1' month as date;";
+ let results = execute_to_batches(&ctx, sql).await;
+
+ let expected = vec![
+ "+------------+",
+ "| date |",
+ "+------------+",
+ "| 1994-02-28 |",
+ "+------------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn sub_interval_month() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "select date '1994-03-31' - interval '1' month as date;";
+ let results = execute_to_batches(&ctx, sql).await;
+
+ let expected = vec![
+ "+------------+",
+ "| date |",
+ "+------------+",
+ "| 1994-02-28 |",
+ "+------------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn sub_month_wrap() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "select date '1994-01-15' - interval '1' month as date;";
+ let results = execute_to_batches(&ctx, sql).await;
+
+ let expected = vec![
+ "+------------+",
+ "| date |",
+ "+------------+",
+ "| 1993-12-15 |",
+ "+------------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn add_interval_day() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "select date '1994-01-15' + interval '1' day as date;";
+ let results = execute_to_batches(&ctx, sql).await;
+
+ let expected = vec![
+ "+------------+",
+ "| date |",
+ "+------------+",
+ "| 1994-01-16 |",
+ "+------------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn sub_interval_day() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "select date '1994-01-01' - interval '1' day as date;";
+ let results = execute_to_batches(&ctx, sql).await;
+
+ let expected = vec![
+ "+------------+",
+ "| date |",
+ "+------------+",
+ "| 1993-12-31 |",
+ "+------------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ Ok(())
+}
diff --git a/datafusion/optimizer/src/simplify_expressions.rs
b/datafusion/optimizer/src/simplify_expressions.rs
index dd63da989..8974639ea 100644
--- a/datafusion/optimizer/src/simplify_expressions.rs
+++ b/datafusion/optimizer/src/simplify_expressions.rs
@@ -1946,7 +1946,7 @@ mod tests {
let date_plus_interval_expr = to_timestamp_expr(ts_string)
.cast_to(&DataType::Date32, schema)
.unwrap()
- + Expr::Literal(ScalarValue::IntervalDayTime(Some(123)));
+ + Expr::Literal(ScalarValue::IntervalDayTime(Some(123i64 << 32)));
let plan = LogicalPlanBuilder::from(table_scan.clone())
.project(vec![date_plus_interval_expr])
@@ -1958,10 +1958,10 @@ mod tests {
// Note that constant folder runs and folds the entire
// expression down to a single constant (true)
- let expected = "Projection: Date32(\"18636\") AS
CAST(totimestamp(Utf8(\"2020-09-08T12:05:00+00:00\")) AS Date32) +
IntervalDayTime(\"123\")\
- \n TableScan: test";
+ let expected = r#"Projection: Date32("18636") AS
CAST(totimestamp(Utf8("2020-09-08T12:05:00+00:00")) AS Date32) +
IntervalDayTime("528280977408")
+ TableScan: test"#;
let actual = get_optimized_plan_formatted(&plan, &time);
- assert_eq!(expected, actual);
+ assert_eq!(actual, expected);
}
}
diff --git a/datafusion/physical-expr/src/expressions/datetime.rs
b/datafusion/physical-expr/src/expressions/datetime.rs
index 3d84e79f2..d4486a3ff 100644
--- a/datafusion/physical-expr/src/expressions/datetime.rs
+++ b/datafusion/physical-expr/src/expressions/datetime.rs
@@ -15,14 +15,17 @@
// specific language governing permissions and limitations
// under the License.
+use crate::expressions::delta::shift_months;
use crate::PhysicalExpr;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
+use chrono::{Duration, NaiveDate};
use datafusion_common::Result;
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_expr::{ColumnarValue, Operator};
use std::any::Any;
use std::fmt::{Display, Formatter};
+use std::ops::{Add, Sub};
use std::sync::Arc;
/// Perform DATE +/ INTERVAL math
@@ -74,88 +77,373 @@ impl PhysicalExpr for DateIntervalExpr {
self
}
- fn data_type(&self, input_schema: &Schema) ->
datafusion_common::Result<DataType> {
+ fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
self.lhs.data_type(input_schema)
}
- fn nullable(&self, input_schema: &Schema) ->
datafusion_common::Result<bool> {
+ fn nullable(&self, input_schema: &Schema) -> Result<bool> {
self.lhs.nullable(input_schema)
}
- fn evaluate(&self, batch: &RecordBatch) ->
datafusion_common::Result<ColumnarValue> {
+ fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let dates = self.lhs.evaluate(batch)?;
let intervals = self.rhs.evaluate(batch)?;
- let interval = match intervals {
- ColumnarValue::Scalar(interval) => match interval {
- ScalarValue::IntervalDayTime(Some(interval)) => interval as
i32,
- ScalarValue::IntervalYearMonth(Some(_)) => {
- return Err(DataFusionError::Execution(
- "DateIntervalExpr does not support
IntervalYearMonth".to_string(),
- ))
- }
- ScalarValue::IntervalMonthDayNano(Some(_)) => {
- return Err(DataFusionError::Execution(
- "DateIntervalExpr does not support
IntervalMonthDayNano"
- .to_string(),
- ))
- }
- other => {
- return Err(DataFusionError::Execution(format!(
- "DateIntervalExpr does not support non-interval type
{:?}",
- other
- )))
- }
- },
- _ => {
- return Err(DataFusionError::Execution(
- "Columnar execution is not yet supported for
DateIntervalExpr"
- .to_string(),
- ))
- }
+ // Unwrap days since epoch
+ let operand = match dates {
+ ColumnarValue::Scalar(scalar) => scalar,
+ _ => Err(DataFusionError::Execution(
+ "Columnar execution is not yet supported for DateIntervalExpr"
+ .to_string(),
+ ))?,
};
- match dates {
- ColumnarValue::Scalar(scalar) => match scalar {
- ScalarValue::Date32(Some(date)) => match &self.op {
- Operator::Plus =>
Ok(ColumnarValue::Scalar(ScalarValue::Date32(
- Some(date + interval),
- ))),
- Operator::Minus =>
Ok(ColumnarValue::Scalar(ScalarValue::Date32(
- Some(date - interval),
- ))),
- _ => {
- // this should be unreachable because we check the
operators in `try_new`
- Err(DataFusionError::Execution(
- "Invalid operator for
DateIntervalExpr".to_string(),
- ))
- }
- },
- ScalarValue::Date64(Some(date)) => match &self.op {
- Operator::Plus =>
Ok(ColumnarValue::Scalar(ScalarValue::Date64(
- Some(date + interval as i64),
- ))),
- Operator::Minus =>
Ok(ColumnarValue::Scalar(ScalarValue::Date64(
- Some(date - interval as i64),
- ))),
- _ => {
- // this should be unreachable because we check the
operators in `try_new`
- Err(DataFusionError::Execution(
- "Invalid operator for
DateIntervalExpr".to_string(),
- ))
- }
- },
- _ => {
- // this should be unreachable because we check the types
in `try_new`
- Err(DataFusionError::Execution(
- "Invalid lhs type for DateIntervalExpr".to_string(),
- ))
- }
- },
+ // Convert to NaiveDate
+ let epoch = NaiveDate::from_ymd(1970, 1, 1);
+ let prior = match operand {
+ ScalarValue::Date32(Some(d)) => epoch.add(Duration::days(d as
i64)),
+ ScalarValue::Date64(Some(ms)) =>
epoch.add(Duration::milliseconds(ms)),
+ _ => Err(DataFusionError::Execution(format!(
+ "Invalid lhs type for DateIntervalExpr: {:?}",
+ operand
+ )))?,
+ };
+
+ // Unwrap interval to add
+ let scalar = match &intervals {
+ ColumnarValue::Scalar(interval) => interval,
_ => Err(DataFusionError::Execution(
"Columnar execution is not yet supported for DateIntervalExpr"
.to_string(),
- )),
+ ))?,
+ };
+
+ // Invert sign for subtraction
+ let sign = match &self.op {
+ Operator::Plus => 1,
+ Operator::Minus => -1,
+ _ => {
+ // this should be unreachable because we check the operators
in `try_new`
+ Err(DataFusionError::Execution(
+ "Invalid operator for DateIntervalExpr".to_string(),
+ ))?
+ }
+ };
+
+ // Do math
+ let posterior = match scalar {
+ ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i,
sign),
+ ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i
* sign),
+ ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior,
*i, sign),
+ other => Err(DataFusionError::Execution(format!(
+ "DateIntervalExpr does not support non-interval type {:?}",
+ other
+ )))?,
+ };
+
+ // convert back
+ let res = match operand {
+ ScalarValue::Date32(Some(_)) => {
+ let days = posterior.sub(epoch).num_days() as i32;
+ ColumnarValue::Scalar(ScalarValue::Date32(Some(days)))
+ }
+ ScalarValue::Date64(Some(_)) => {
+ let ms = posterior.sub(epoch).num_milliseconds();
+ ColumnarValue::Scalar(ScalarValue::Date64(Some(ms)))
+ }
+ _ => Err(DataFusionError::Execution(format!(
+ "Invalid lhs type for DateIntervalExpr: {}",
+ scalar
+ )))?,
+ };
+ Ok(res)
+ }
+}
+
+// Can remove once https://github.com/apache/arrow-rs/pull/2031 is released
+fn add_m_d_nano(prior: NaiveDate, interval: i128, sign: i32) -> NaiveDate {
+ let interval = interval as u128;
+ let nanos = (interval >> 64) as i64 * sign as i64;
+ let days = (interval >> 32) as i32 * sign;
+ let months = interval as i32 * sign;
+ let a = shift_months(prior, months);
+ let b = a.add(Duration::days(days as i64));
+ b.add(Duration::nanoseconds(nanos))
+}
+
+// Can remove once https://github.com/apache/arrow-rs/pull/2031 is released
+fn add_day_time(prior: NaiveDate, interval: i64, sign: i32) -> NaiveDate {
+ let interval = interval as u64;
+ let days = (interval >> 32) as i32 * sign;
+ let ms = interval as i32 * sign;
+ let intermediate = prior.add(Duration::days(days as i64));
+ intermediate.add(Duration::milliseconds(ms as i64))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::create_physical_expr;
+ use crate::execution_props::ExecutionProps;
+ use arrow::array::{ArrayRef, Date32Builder};
+ use arrow::datatypes::*;
+ use datafusion_common::{Result, ToDFSchema};
+ use datafusion_expr::Expr;
+
+ #[test]
+ fn add_11_months() {
+ let prior = NaiveDate::from_ymd(2000, 1, 1);
+ let actual = shift_months(prior, 11);
+ assert_eq!(format!("{:?}", actual).as_str(), "2000-12-01");
+ }
+
+ #[test]
+ fn add_12_months() {
+ let prior = NaiveDate::from_ymd(2000, 1, 1);
+ let actual = shift_months(prior, 12);
+ assert_eq!(format!("{:?}", actual).as_str(), "2001-01-01");
+ }
+
+ #[test]
+ fn add_13_months() {
+ let prior = NaiveDate::from_ymd(2000, 1, 1);
+ let actual = shift_months(prior, 13);
+ assert_eq!(format!("{:?}", actual).as_str(), "2001-02-01");
+ }
+
+ #[test]
+ fn sub_11_months() {
+ let prior = NaiveDate::from_ymd(2000, 1, 1);
+ let actual = shift_months(prior, -11);
+ assert_eq!(format!("{:?}", actual).as_str(), "1999-02-01");
+ }
+
+ #[test]
+ fn sub_12_months() {
+ let prior = NaiveDate::from_ymd(2000, 1, 1);
+ let actual = shift_months(prior, -12);
+ assert_eq!(format!("{:?}", actual).as_str(), "1999-01-01");
+ }
+
+ #[test]
+ fn sub_13_months() {
+ let prior = NaiveDate::from_ymd(2000, 1, 1);
+ let actual = shift_months(prior, -13);
+ assert_eq!(format!("{:?}", actual).as_str(), "1998-12-01");
+ }
+
+ #[test]
+ fn add_32_day_time() -> Result<()> {
+ // setup
+ let dt = Expr::Literal(ScalarValue::Date32(Some(0)));
+ let op = Operator::Plus;
+ let interval = create_day_time(1, 0);
+ let interval =
Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+
+ // exercise
+ let res = exercise(&dt, op, &interval)?;
+
+ // assert
+ match res {
+ ColumnarValue::Scalar(ScalarValue::Date32(Some(d))) => {
+ let epoch = NaiveDate::from_ymd(1970, 1, 1);
+ let res = epoch.add(Duration::days(d as i64));
+ assert_eq!(format!("{:?}", res).as_str(), "1970-01-02");
+ }
+ _ => Err(DataFusionError::NotImplemented(
+ "Unexpected result!".to_string(),
+ ))?,
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn sub_32_year_month() -> Result<()> {
+ // setup
+ let dt = Expr::Literal(ScalarValue::Date32(Some(0)));
+ let op = Operator::Minus;
+ let interval = Expr::Literal(ScalarValue::IntervalYearMonth(Some(13)));
+
+ // exercise
+ let res = exercise(&dt, op, &interval)?;
+
+ // assert
+ match res {
+ ColumnarValue::Scalar(ScalarValue::Date32(Some(d))) => {
+ let epoch = NaiveDate::from_ymd(1970, 1, 1);
+ let res = epoch.add(Duration::days(d as i64));
+ assert_eq!(format!("{:?}", res).as_str(), "1968-12-01");
+ }
+ _ => Err(DataFusionError::NotImplemented(
+ "Unexpected result!".to_string(),
+ ))?,
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn add_64_day_time() -> Result<()> {
+ // setup
+ let dt = Expr::Literal(ScalarValue::Date64(Some(0)));
+ let op = Operator::Plus;
+ let interval = create_day_time(-15, -24 * 60 * 60 * 1000);
+ let interval =
Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+
+ // exercise
+ let res = exercise(&dt, op, &interval)?;
+
+ // assert
+ match res {
+ ColumnarValue::Scalar(ScalarValue::Date64(Some(d))) => {
+ let epoch = NaiveDate::from_ymd(1970, 1, 1);
+ let res = epoch.add(Duration::milliseconds(d as i64));
+ assert_eq!(format!("{:?}", res).as_str(), "1969-12-16");
+ }
+ _ => Err(DataFusionError::NotImplemented(
+ "Unexpected result!".to_string(),
+ ))?,
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn add_32_year_month() -> Result<()> {
+ // setup
+ let dt = Expr::Literal(ScalarValue::Date32(Some(0)));
+ let op = Operator::Plus;
+ let interval = Expr::Literal(ScalarValue::IntervalYearMonth(Some(1)));
+
+ // exercise
+ let res = exercise(&dt, op, &interval)?;
+
+ // assert
+ match res {
+ ColumnarValue::Scalar(ScalarValue::Date32(Some(d))) => {
+ let epoch = NaiveDate::from_ymd(1970, 1, 1);
+ let res = epoch.add(Duration::days(d as i64));
+ assert_eq!(format!("{:?}", res).as_str(), "1970-02-01");
+ }
+ _ => Err(DataFusionError::NotImplemented(
+ "Unexpected result!".to_string(),
+ ))?,
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn add_32_month_day_nano() -> Result<()> {
+ // setup
+ let dt = Expr::Literal(ScalarValue::Date32(Some(0)));
+ let op = Operator::Plus;
+
+ let interval = create_month_day_nano(-12, -15, -42);
+
+ let interval =
Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(interval)));
+
+ // exercise
+ let res = exercise(&dt, op, &interval)?;
+
+ // assert
+ match res {
+ ColumnarValue::Scalar(ScalarValue::Date32(Some(d))) => {
+ let epoch = NaiveDate::from_ymd(1970, 1, 1);
+ let res = epoch.add(Duration::days(d as i64));
+ assert_eq!(format!("{:?}", res).as_str(), "1968-12-17");
+ }
+ _ => Err(DataFusionError::NotImplemented(
+ "Unexpected result!".to_string(),
+ ))?,
}
+
+ Ok(())
+ }
+
+ #[test]
+ fn invalid_interval() -> Result<()> {
+ // setup
+ let dt = Expr::Literal(ScalarValue::Date32(Some(0)));
+ let op = Operator::Plus;
+ let interval = Expr::Literal(ScalarValue::Null);
+
+ // exercise
+ let res = exercise(&dt, op, &interval);
+ assert!(res.is_err(), "Can't add a NULL interval");
+
+ Ok(())
+ }
+
+ #[test]
+ fn invalid_date() -> Result<()> {
+ // setup
+ let dt = Expr::Literal(ScalarValue::Null);
+ let op = Operator::Plus;
+ let interval =
Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(0)));
+
+ // exercise
+ let res = exercise(&dt, op, &interval);
+ assert!(res.is_err(), "Can't add to NULL date");
+
+ Ok(())
+ }
+
+ #[test]
+ fn invalid_op() -> Result<()> {
+ // setup
+ let dt = Expr::Literal(ScalarValue::Date32(Some(0)));
+ let op = Operator::Eq;
+ let interval =
Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(0)));
+
+ // exercise
+ let res = exercise(&dt, op, &interval);
+ assert!(res.is_err(), "Can't add dates with == operator");
+
+ Ok(())
+ }
+
+ fn exercise(dt: &Expr, op: Operator, interval: &Expr) ->
Result<ColumnarValue> {
+ let mut builder = Date32Builder::new(1);
+ builder.append_value(0).unwrap();
+ let a: ArrayRef = Arc::new(builder.finish());
+ let schema = Schema::new(vec![Field::new("a", DataType::Date32,
false)]);
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
+
+ let dfs = schema.clone().to_dfschema()?;
+ let props = ExecutionProps::new();
+
+ let lhs = create_physical_expr(dt, &dfs, &schema, &props)?;
+ let rhs = create_physical_expr(interval, &dfs, &schema, &props)?;
+
+ let cut = DateIntervalExpr::try_new(lhs, op, rhs, &schema)?;
+ let res = cut.evaluate(&batch)?;
+ Ok(res)
+ }
+
+ // Can remove once https://github.com/apache/arrow-rs/pull/2031 is released
+
+ /// Creates an IntervalDayTime given its constituent components
+ ///
+ ///
https://github.com/apache/arrow-rs/blob/e59b023480437f67e84ba2f827b58f78fd44c3a1/integration-testing/src/lib.rs#L222
+ fn create_day_time(days: i32, millis: i32) -> i64 {
+ let m = millis as u64 & u32::MAX as u64;
+ let d = (days as u64 & u32::MAX as u64) << 32;
+ (m | d) as i64
+ }
+
+ // Can remove once https://github.com/apache/arrow-rs/pull/2031 is released
+ /// Creates an IntervalMonthDayNano given its constituent components
+ ///
+ /// Source:
https://github.com/apache/arrow-rs/blob/e59b023480437f67e84ba2f827b58f78fd44c3a1/integration-testing/src/lib.rs#L340
+ /// ((nanoseconds as i128) & 0xFFFFFFFFFFFFFFFF) << 64
+ /// | ((days as i128) & 0xFFFFFFFF) << 32
+ /// | ((months as i128) & 0xFFFFFFFF);
+ fn create_month_day_nano(months: i32, days: i32, nanos: i64) -> i128 {
+ let m = months as u128 & u32::MAX as u128;
+ let d = (days as u128 & u32::MAX as u128) << 32;
+ let n = (nanos as u128) << 64;
+ (m | d | n) as i128
}
}
diff --git a/datafusion/physical-expr/src/expressions/delta.rs
b/datafusion/physical-expr/src/expressions/delta.rs
new file mode 100644
index 000000000..b7efdab0a
--- /dev/null
+++ b/datafusion/physical-expr/src/expressions/delta.rs
@@ -0,0 +1,182 @@
+// MIT License
+//
+// Copyright (c) 2020-2022 Oliver Margetts
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE
+// SOFTWARE.
+
+// Copied from chronoutil crate
+
+//! Contains utility functions for shifting Date objects.
+use chrono::Datelike;
+
+/// Returns true if the year is a leap-year, as naively defined in the
Gregorian calendar.
+#[inline]
+pub(crate) fn is_leap_year(year: i32) -> bool {
+ year % 4 == 0 && (year % 100 != 0 || year % 400 == 0)
+}
+
+// If the day lies within the month, this function has no effect. Otherwise,
it shifts
+// day backwards to the final day of the month.
+// XXX: No attempt is made to handle days outside the 1-31 range.
+#[inline]
+fn normalise_day(year: i32, month: u32, day: u32) -> u32 {
+ if day <= 28 {
+ day
+ } else if month == 2 {
+ 28 + is_leap_year(year) as u32
+ } else if day == 31 && (month == 4 || month == 6 || month == 9 || month ==
11) {
+ 30
+ } else {
+ day
+ }
+}
+
+/// Shift a date by the given number of months.
+/// Ambiguous month-ends are shifted backwards as necessary.
+pub(crate) fn shift_months<D: Datelike>(date: D, months: i32) -> D {
+ let mut year = date.year() + (date.month() as i32 + months) / 12;
+ let mut month = (date.month() as i32 + months) % 12;
+ let mut day = date.day();
+
+ if month < 1 {
+ year -= 1;
+ month += 12;
+ }
+
+ day = normalise_day(year, month as u32, day);
+
+ // This is slow but guaranteed to succeed (short of interger overflow)
+ if day <= 28 {
+ date.with_day(day)
+ .unwrap()
+ .with_month(month as u32)
+ .unwrap()
+ .with_year(year)
+ .unwrap()
+ } else {
+ date.with_day(1)
+ .unwrap()
+ .with_month(month as u32)
+ .unwrap()
+ .with_year(year)
+ .unwrap()
+ .with_day(day)
+ .unwrap()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashSet;
+
+ use chrono::naive::{NaiveDate, NaiveDateTime, NaiveTime};
+
+ use super::*;
+
+ #[test]
+ fn test_leap_year_cases() {
+ let _leap_years: Vec<i32> = vec![
+ 1904, 1908, 1912, 1916, 1920, 1924, 1928, 1932, 1936, 1940, 1944,
1948, 1952,
+ 1956, 1960, 1964, 1968, 1972, 1976, 1980, 1984, 1988, 1992, 1996,
2000, 2004,
+ 2008, 2012, 2016, 2020,
+ ];
+ let leap_years_1900_to_2020: HashSet<i32> =
_leap_years.into_iter().collect();
+
+ for year in 1900..2021 {
+ assert_eq!(is_leap_year(year),
leap_years_1900_to_2020.contains(&year))
+ }
+ }
+
+ #[test]
+ fn test_shift_months() {
+ let base = NaiveDate::from_ymd(2020, 1, 31);
+
+ assert_eq!(shift_months(base, 0), NaiveDate::from_ymd(2020, 1, 31));
+ assert_eq!(shift_months(base, 1), NaiveDate::from_ymd(2020, 2, 29));
+ assert_eq!(shift_months(base, 2), NaiveDate::from_ymd(2020, 3, 31));
+ assert_eq!(shift_months(base, 3), NaiveDate::from_ymd(2020, 4, 30));
+ assert_eq!(shift_months(base, 4), NaiveDate::from_ymd(2020, 5, 31));
+ assert_eq!(shift_months(base, 5), NaiveDate::from_ymd(2020, 6, 30));
+ assert_eq!(shift_months(base, 6), NaiveDate::from_ymd(2020, 7, 31));
+ assert_eq!(shift_months(base, 7), NaiveDate::from_ymd(2020, 8, 31));
+ assert_eq!(shift_months(base, 8), NaiveDate::from_ymd(2020, 9, 30));
+ assert_eq!(shift_months(base, 9), NaiveDate::from_ymd(2020, 10, 31));
+ assert_eq!(shift_months(base, 10), NaiveDate::from_ymd(2020, 11, 30));
+ assert_eq!(shift_months(base, 11), NaiveDate::from_ymd(2020, 12, 31));
+ assert_eq!(shift_months(base, 12), NaiveDate::from_ymd(2021, 1, 31));
+ assert_eq!(shift_months(base, 13), NaiveDate::from_ymd(2021, 2, 28));
+
+ assert_eq!(shift_months(base, -1), NaiveDate::from_ymd(2019, 12, 31));
+ assert_eq!(shift_months(base, -2), NaiveDate::from_ymd(2019, 11, 30));
+ assert_eq!(shift_months(base, -3), NaiveDate::from_ymd(2019, 10, 31));
+ assert_eq!(shift_months(base, -4), NaiveDate::from_ymd(2019, 9, 30));
+ assert_eq!(shift_months(base, -5), NaiveDate::from_ymd(2019, 8, 31));
+ assert_eq!(shift_months(base, -6), NaiveDate::from_ymd(2019, 7, 31));
+ assert_eq!(shift_months(base, -7), NaiveDate::from_ymd(2019, 6, 30));
+ assert_eq!(shift_months(base, -8), NaiveDate::from_ymd(2019, 5, 31));
+ assert_eq!(shift_months(base, -9), NaiveDate::from_ymd(2019, 4, 30));
+ assert_eq!(shift_months(base, -10), NaiveDate::from_ymd(2019, 3, 31));
+ assert_eq!(shift_months(base, -11), NaiveDate::from_ymd(2019, 2, 28));
+ assert_eq!(shift_months(base, -12), NaiveDate::from_ymd(2019, 1, 31));
+ assert_eq!(shift_months(base, -13), NaiveDate::from_ymd(2018, 12, 31));
+
+ assert_eq!(shift_months(base, 1265), NaiveDate::from_ymd(2125, 6, 30));
+ }
+
+ #[test]
+ fn test_shift_months_with_overflow() {
+ let base = NaiveDate::from_ymd(2020, 12, 31);
+
+ assert_eq!(shift_months(base, 0), base);
+ assert_eq!(shift_months(base, 1), NaiveDate::from_ymd(2021, 1, 31));
+ assert_eq!(shift_months(base, 2), NaiveDate::from_ymd(2021, 2, 28));
+ assert_eq!(shift_months(base, 12), NaiveDate::from_ymd(2021, 12, 31));
+ assert_eq!(shift_months(base, 18), NaiveDate::from_ymd(2022, 6, 30));
+
+ assert_eq!(shift_months(base, -1), NaiveDate::from_ymd(2020, 11, 30));
+ assert_eq!(shift_months(base, -2), NaiveDate::from_ymd(2020, 10, 31));
+ assert_eq!(shift_months(base, -10), NaiveDate::from_ymd(2020, 2, 29));
+ assert_eq!(shift_months(base, -12), NaiveDate::from_ymd(2019, 12, 31));
+ assert_eq!(shift_months(base, -18), NaiveDate::from_ymd(2019, 6, 30));
+ }
+
+ #[test]
+ fn test_shift_months_datetime() {
+ let date = NaiveDate::from_ymd(2020, 1, 31);
+ let o_clock = NaiveTime::from_hms(1, 2, 3);
+
+ let base = NaiveDateTime::new(date, o_clock);
+
+ assert_eq!(
+ shift_months(base, 0).date(),
+ NaiveDate::from_ymd(2020, 1, 31)
+ );
+ assert_eq!(
+ shift_months(base, 1).date(),
+ NaiveDate::from_ymd(2020, 2, 29)
+ );
+ assert_eq!(
+ shift_months(base, 2).date(),
+ NaiveDate::from_ymd(2020, 3, 31)
+ );
+ assert_eq!(shift_months(base, 0).time(), o_clock);
+ assert_eq!(shift_months(base, 1).time(), o_clock);
+ assert_eq!(shift_months(base, 2).time(), o_clock);
+ }
+}
diff --git a/datafusion/physical-expr/src/expressions/mod.rs
b/datafusion/physical-expr/src/expressions/mod.rs
index 8eb95f2bf..7a78f4603 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -23,6 +23,7 @@ mod case;
mod cast;
mod column;
mod datetime;
+mod delta;
mod get_indexed_field;
mod in_list;
mod is_not_null;