avantgardnerio commented on code in PR #3110:
URL: https://github.com/apache/arrow-datafusion/pull/3110#discussion_r946056340


##########
datafusion/physical-expr/src/expressions/datetime.rs:
##########
@@ -130,38 +120,196 @@ impl PhysicalExpr for DateIntervalExpr {
             }
         };
 
-        // Do math
-        let posterior = match scalar {
-            ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, 
sign),
-            ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i 
* sign),
-            ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, 
*i, sign),
-            other => Err(DataFusionError::Execution(format!(
-                "DateIntervalExpr does not support non-interval type {:?}",
-                other
-            )))?,
-        };
-
-        // convert back
-        let res = match operand {
-            ScalarValue::Date32(Some(_)) => {
-                let days = posterior.sub(epoch).num_days() as i32;
-                ColumnarValue::Scalar(ScalarValue::Date32(Some(days)))
-            }
-            ScalarValue::Date64(Some(_)) => {
-                let ms = posterior.sub(epoch).num_milliseconds();
-                ColumnarValue::Scalar(ScalarValue::Date64(Some(ms)))
-            }
-            _ => Err(DataFusionError::Execution(format!(
-                "Invalid lhs type for DateIntervalExpr: {}",
-                scalar
-            )))?,
-        };
-        Ok(res)
+        match dates {
+            ColumnarValue::Scalar(operand) => evaluate_scalar(operand, sign, 
intervals),
+            ColumnarValue::Array(array) => evaluate_array(array, sign, 
intervals),
+        }
     }
 }
 
+pub fn evaluate_array(
+    array: ArrayRef,
+    sign: i32,
+    scalar: &ScalarValue,
+) -> Result<ColumnarValue> {
+    let ret = match array.data_type() {
+        DataType::Date32 => {
+            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
+            Arc::new(unary::<Date32Type, _, Date32Type>(array, |days| {
+                date32_add(days, scalar, sign).unwrap()
+            })) as ArrayRef
+        }
+        DataType::Date64 => {
+            let array = array.as_any().downcast_ref::<Date64Array>().unwrap();
+            Arc::new(unary::<Date64Type, _, Date64Type>(array, |ms| {
+                date64_add(ms, scalar, sign).unwrap()
+            })) as ArrayRef
+        }
+        DataType::Timestamp(TimeUnit::Second, _) => {
+            let array = array
+                .as_any()
+                .downcast_ref::<TimestampSecondArray>()
+                .unwrap();
+            Arc::new(unary::<TimestampSecondType, _, TimestampSecondType>(
+                array,
+                |ts_s| seconds_add(ts_s, scalar, sign).unwrap(),
+            )) as ArrayRef
+        }
+        DataType::Timestamp(TimeUnit::Millisecond, _) => {
+            let array = array
+                .as_any()
+                .downcast_ref::<TimestampMillisecondArray>()
+                .unwrap();
+            Arc::new(
+                unary::<TimestampMillisecondType, _, TimestampMillisecondType>(
+                    array,
+                    |ts_ms| milliseconds_add(ts_ms, scalar, sign).unwrap(),
+                ),
+            ) as ArrayRef
+        }
+        DataType::Timestamp(TimeUnit::Microsecond, _) => {
+            let array = array
+                .as_any()
+                .downcast_ref::<TimestampMicrosecondArray>()
+                .unwrap();
+            Arc::new(
+                unary::<TimestampMicrosecondType, _, TimestampMicrosecondType>(
+                    array,
+                    |ts_us| microseconds_add(ts_us, scalar, sign).unwrap(),
+                ),
+            ) as ArrayRef
+        }
+        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+            let array = array
+                .as_any()
+                .downcast_ref::<TimestampNanosecondArray>()
+                .unwrap();
+            Arc::new(
+                unary::<TimestampNanosecondType, _, TimestampNanosecondType>(
+                    array,
+                    |ts_ns| nanoseconds_add(ts_ns, scalar, sign).unwrap(),
+                ),
+            ) as ArrayRef
+        }
+        _ => Err(DataFusionError::Execution(format!(
+            "Invalid lhs type for DateIntervalExpr: {}",
+            array.data_type()
+        )))?,
+    };
+    Ok(ColumnarValue::Array(ret))
+}
+
+fn evaluate_scalar(
+    operand: ScalarValue,
+    sign: i32,
+    scalar: &ScalarValue,
+) -> Result<ColumnarValue> {
+    let res = match operand {
+        ScalarValue::Date32(Some(days)) => {
+            let value = date32_add(days, scalar, sign)?;
+            ColumnarValue::Scalar(ScalarValue::Date32(Some(value)))
+        }
+        ScalarValue::Date64(Some(ms)) => {
+            let value = date64_add(ms, scalar, sign)?;
+            ColumnarValue::Scalar(ScalarValue::Date64(Some(value)))
+        }
+        ScalarValue::TimestampSecond(Some(ts_s), zone) => {
+            let value = seconds_add(ts_s, scalar, sign)?;
+            ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(value), 
zone))
+        }
+        ScalarValue::TimestampMillisecond(Some(ts_ms), zone) => {
+            let value = milliseconds_add(ts_ms, scalar, sign)?;
+            
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(value), zone))
+        }
+        ScalarValue::TimestampMicrosecond(Some(ts_us), zone) => {
+            let value = microseconds_add(ts_us, scalar, sign)?;
+            
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(value), zone))
+        }
+        ScalarValue::TimestampNanosecond(Some(ts_ns), zone) => {
+            let value = nanoseconds_add(ts_ns, scalar, sign)?;
+            
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(value), zone))
+        }
+        _ => Err(DataFusionError::Execution(format!(
+            "Invalid lhs type {} for DateIntervalExpr",
+            operand.get_datatype()
+        )))?,
+    };
+    Ok(res)
+}
+
+#[inline]
+fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::days(days as i64));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_days() as i32)
+}
+
+#[inline]
+fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::milliseconds(ms));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_milliseconds())
+}
+
+#[inline]
+fn seconds_add(ts_s: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    Ok(do_data_time_math(ts_s, 0, scalar, sign)?.timestamp())
+}
+
+#[inline]
+fn milliseconds_add(ts_ms: i64, scalar: &ScalarValue, sign: i32) -> 
Result<i64> {
+    let secs = ts_ms / 1000;
+    let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
+    Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_millis())
+}
+
+#[inline]
+fn microseconds_add(ts_us: i64, scalar: &ScalarValue, sign: i32) -> 
Result<i64> {
+    let secs = ts_us / 1_000_000;
+    let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
+    Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos() / 1000)
+}
+
+#[inline]
+fn nanoseconds_add(ts_ns: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> 
{
+    let secs = ts_ns / 1_000_000_000;
+    let nsecs = (ts_ns % 1_000_000_000) as u32;
+    Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos())
+}
+
+#[inline]
+fn do_data_time_math(
+    secs: i64,
+    nsecs: u32,
+    scalar: &ScalarValue,
+    sign: i32,
+) -> Result<NaiveDateTime> {
+    let prior = NaiveDateTime::from_timestamp(secs, nsecs);
+    do_date_math(prior, scalar, sign)
+}
+
+fn do_date_math<D>(prior: D, scalar: &ScalarValue, sign: i32) -> Result<D>
+where
+    D: Datelike + Add<Duration, Output = D>,
+{
+    Ok(match scalar {
+        ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
+        ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * 
sign),
+        ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, 
sign),
+        other => Err(DataFusionError::Execution(format!(
+            "DateIntervalExpr does not support non-interval type {:?}",
+            other
+        )))?,
+    })
+}
+
 // Can remove once https://github.com/apache/arrow-rs/pull/2031 is released

Review Comment:
   > how much of this can now be removed
   
   Unfortunately the new implementation requires an additional change to 
chrono. Please see my comment in #3153.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to