alamb commented on code in PR #3110:
URL: https://github.com/apache/arrow-datafusion/pull/3110#discussion_r946117554
##########
datafusion/physical-expr/src/expressions/datetime.rs:
##########
@@ -130,38 +120,196 @@ impl PhysicalExpr for DateIntervalExpr {
}
};
- // Do math
- let posterior = match scalar {
- ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i,
sign),
- ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i
* sign),
- ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior,
*i, sign),
- other => Err(DataFusionError::Execution(format!(
- "DateIntervalExpr does not support non-interval type {:?}",
- other
- )))?,
- };
-
- // convert back
- let res = match operand {
- ScalarValue::Date32(Some(_)) => {
- let days = posterior.sub(epoch).num_days() as i32;
- ColumnarValue::Scalar(ScalarValue::Date32(Some(days)))
- }
- ScalarValue::Date64(Some(_)) => {
- let ms = posterior.sub(epoch).num_milliseconds();
- ColumnarValue::Scalar(ScalarValue::Date64(Some(ms)))
- }
- _ => Err(DataFusionError::Execution(format!(
- "Invalid lhs type for DateIntervalExpr: {}",
- scalar
- )))?,
- };
- Ok(res)
+ match dates {
+ ColumnarValue::Scalar(operand) => evaluate_scalar(operand, sign,
intervals),
+ ColumnarValue::Array(array) => evaluate_array(array, sign,
intervals),
+ }
}
}
+pub fn evaluate_array(
+ array: ArrayRef,
+ sign: i32,
+ scalar: &ScalarValue,
+) -> Result<ColumnarValue> {
+ let ret = match array.data_type() {
+ DataType::Date32 => {
+ let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
+ Arc::new(unary::<Date32Type, _, Date32Type>(array, |days| {
+ date32_add(days, scalar, sign).unwrap()
+ })) as ArrayRef
+ }
+ DataType::Date64 => {
+ let array = array.as_any().downcast_ref::<Date64Array>().unwrap();
+ Arc::new(unary::<Date64Type, _, Date64Type>(array, |ms| {
+ date64_add(ms, scalar, sign).unwrap()
+ })) as ArrayRef
+ }
+ DataType::Timestamp(TimeUnit::Second, _) => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampSecondArray>()
+ .unwrap();
+ Arc::new(unary::<TimestampSecondType, _, TimestampSecondType>(
+ array,
+ |ts_s| seconds_add(ts_s, scalar, sign).unwrap(),
+ )) as ArrayRef
+ }
+ DataType::Timestamp(TimeUnit::Millisecond, _) => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .unwrap();
+ Arc::new(
+ unary::<TimestampMillisecondType, _, TimestampMillisecondType>(
+ array,
+ |ts_ms| milliseconds_add(ts_ms, scalar, sign).unwrap(),
+ ),
+ ) as ArrayRef
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap();
+ Arc::new(
+ unary::<TimestampMicrosecondType, _, TimestampMicrosecondType>(
+ array,
+ |ts_us| microseconds_add(ts_us, scalar, sign).unwrap(),
+ ),
+ ) as ArrayRef
+ }
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap();
+ Arc::new(
+ unary::<TimestampNanosecondType, _, TimestampNanosecondType>(
+ array,
+ |ts_ns| nanoseconds_add(ts_ns, scalar, sign).unwrap(),
+ ),
+ ) as ArrayRef
+ }
+ _ => Err(DataFusionError::Execution(format!(
+ "Invalid lhs type for DateIntervalExpr: {}",
+ array.data_type()
+ )))?,
+ };
+ Ok(ColumnarValue::Array(ret))
+}
+
+fn evaluate_scalar(
+ operand: ScalarValue,
+ sign: i32,
+ scalar: &ScalarValue,
+) -> Result<ColumnarValue> {
+ let res = match operand {
+ ScalarValue::Date32(Some(days)) => {
+ let value = date32_add(days, scalar, sign)?;
+ ColumnarValue::Scalar(ScalarValue::Date32(Some(value)))
+ }
+ ScalarValue::Date64(Some(ms)) => {
+ let value = date64_add(ms, scalar, sign)?;
+ ColumnarValue::Scalar(ScalarValue::Date64(Some(value)))
+ }
+ ScalarValue::TimestampSecond(Some(ts_s), zone) => {
+ let value = seconds_add(ts_s, scalar, sign)?;
+ ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(value),
zone))
+ }
+ ScalarValue::TimestampMillisecond(Some(ts_ms), zone) => {
+ let value = milliseconds_add(ts_ms, scalar, sign)?;
+
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(value), zone))
+ }
+ ScalarValue::TimestampMicrosecond(Some(ts_us), zone) => {
+ let value = microseconds_add(ts_us, scalar, sign)?;
+
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(value), zone))
+ }
+ ScalarValue::TimestampNanosecond(Some(ts_ns), zone) => {
+ let value = nanoseconds_add(ts_ns, scalar, sign)?;
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(value), zone))
+ }
+ _ => Err(DataFusionError::Execution(format!(
+ "Invalid lhs type {} for DateIntervalExpr",
+ operand.get_datatype()
+ )))?,
+ };
+ Ok(res)
+}
+
+#[inline]
+fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
+ let epoch = NaiveDate::from_ymd(1970, 1, 1);
+ let prior = epoch.add(Duration::days(days as i64));
+ let posterior = do_date_math(prior, scalar, sign)?;
+ Ok(posterior.sub(epoch).num_days() as i32)
+}
+
+#[inline]
+fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+ let epoch = NaiveDate::from_ymd(1970, 1, 1);
+ let prior = epoch.add(Duration::milliseconds(ms));
+ let posterior = do_date_math(prior, scalar, sign)?;
+ Ok(posterior.sub(epoch).num_milliseconds())
+}
+
+#[inline]
+fn seconds_add(ts_s: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+ Ok(do_data_time_math(ts_s, 0, scalar, sign)?.timestamp())
+}
+
+#[inline]
+fn milliseconds_add(ts_ms: i64, scalar: &ScalarValue, sign: i32) ->
Result<i64> {
+ let secs = ts_ms / 1000;
+ let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
+ Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_millis())
+}
+
+#[inline]
+fn microseconds_add(ts_us: i64, scalar: &ScalarValue, sign: i32) ->
Result<i64> {
+ let secs = ts_us / 1_000_000;
+ let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
+ Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos() / 1000)
+}
+
+#[inline]
+fn nanoseconds_add(ts_ns: i64, scalar: &ScalarValue, sign: i32) -> Result<i64>
{
+ let secs = ts_ns / 1_000_000_000;
+ let nsecs = (ts_ns % 1_000_000_000) as u32;
+ Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos())
+}
+
+#[inline]
+fn do_data_time_math(
+ secs: i64,
+ nsecs: u32,
+ scalar: &ScalarValue,
+ sign: i32,
+) -> Result<NaiveDateTime> {
+ let prior = NaiveDateTime::from_timestamp(secs, nsecs);
+ do_date_math(prior, scalar, sign)
+}
+
+fn do_date_math<D>(prior: D, scalar: &ScalarValue, sign: i32) -> Result<D>
+where
+ D: Datelike + Add<Duration, Output = D>,
+{
+ Ok(match scalar {
+ ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
+ ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i *
sign),
+ ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i,
sign),
+ other => Err(DataFusionError::Execution(format!(
+ "DateIntervalExpr does not support non-interval type {:?}",
+ other
+ )))?,
+ })
+}
+
// Can remove once https://github.com/apache/arrow-rs/pull/2031 is released
Review Comment:
Some day we'll get there!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]