avantgardnerio commented on code in PR #3110:
URL: https://github.com/apache/arrow-datafusion/pull/3110#discussion_r945834995
##########
datafusion/physical-expr/src/expressions/datetime.rs:
##########
@@ -112,71 +120,166 @@ impl PhysicalExpr for DateIntervalExpr {
}
};
- // Unwrap days since epoch
- let operand = match dates {
- ColumnarValue::Scalar(scalar) => scalar,
- _ => Err(DataFusionError::Execution(
- "Columnar execution is not yet supported for DateIntervalExpr"
- .to_string(),
- ))?,
- };
-
- let res = match operand {
- ScalarValue::Date32(Some(d)) => {
- let epoch = NaiveDate::from_ymd(1970, 1, 1);
- let prior = epoch.add(Duration::days(d as i64));
- let posterior = do_date_math(prior, scalar, sign)?;
- let days = posterior.sub(epoch).num_days() as i32;
- ColumnarValue::Scalar(ScalarValue::Date32(Some(days)))
- }
- ScalarValue::Date64(Some(ms)) => {
- let epoch = NaiveDate::from_ymd(1970, 1, 1);
- let prior = epoch.add(Duration::milliseconds(ms));
- let posterior = do_date_math(prior, scalar, sign)?;
- let ms = posterior.sub(epoch).num_milliseconds();
- ColumnarValue::Scalar(ScalarValue::Date64(Some(ms)))
- }
- ScalarValue::TimestampSecond(Some(ts_s), zone) => {
- let value = do_data_time_math(ts_s, 0, scalar,
sign)?.timestamp();
-
ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(value), zone))
- }
- ScalarValue::TimestampMillisecond(Some(ts_ms), zone) => {
- let secs = ts_ms / 1000;
- let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
- let value =
- do_data_time_math(secs, nsecs, scalar,
sign)?.timestamp_millis();
- ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
- Some(value),
- zone,
- ))
- }
- ScalarValue::TimestampMicrosecond(Some(ts_us), zone) => {
- let secs = ts_us / 1_000_000;
- let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
- let value = do_data_time_math(secs, nsecs, scalar, sign)?
- .timestamp_nanos()
- / 1000;
- ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
- Some(value),
- zone,
- ))
- }
- ScalarValue::TimestampNanosecond(Some(ts_ns), zone) => {
- let secs = ts_ns / 1_000_000_000;
- let nsecs = (ts_ns % 1_000_000_000) as u32;
- let value =
- do_data_time_math(secs, nsecs, scalar,
sign)?.timestamp_nanos();
-
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(value), zone))
- }
- _ => Err(DataFusionError::Execution(format!(
- "Invalid lhs type for DateIntervalExpr: {}",
- scalar
- )))?,
- };
- Ok(res)
+ match dates {
+ ColumnarValue::Scalar(operand) => evaluate_scalar(operand, sign,
intervals),
+ ColumnarValue::Array(array) => evaluate_array(array, sign,
intervals),
+ }
}
}
+pub fn evaluate_array(
+ array: ArrayRef,
+ sign: i32,
+ scalar: &ScalarValue,
+) -> Result<ColumnarValue> {
+ let ret = match array.data_type() {
+ DataType::Date32 => {
+ let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
+ Arc::new(unary::<Date32Type, _, Date32Type>(array, |days| {
+ date32_add(days, scalar, sign).unwrap()
+ })) as ArrayRef
+ }
+ DataType::Date64 => {
+ let array = array.as_any().downcast_ref::<Date64Array>().unwrap();
+ Arc::new(unary::<Date64Type, _, Date64Type>(array, |ms| {
+ date64_add(ms, scalar, sign).unwrap()
+ })) as ArrayRef
+ }
+ DataType::Timestamp(TimeUnit::Second, _) => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampSecondArray>()
+ .unwrap();
+ Arc::new(unary::<TimestampSecondType, _, TimestampSecondType>(
+ array,
+ |ts_s| seconds_add(ts_s, scalar, sign).unwrap(),
+ )) as ArrayRef
+ }
+ DataType::Timestamp(TimeUnit::Millisecond, _) => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .unwrap();
+ Arc::new(
+ unary::<TimestampMillisecondType, _, TimestampMillisecondType>(
+ array,
+ |ts_ms| milliseconds_add(ts_ms, scalar, sign).unwrap(),
+ ),
+ ) as ArrayRef
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap();
+ Arc::new(
+ unary::<TimestampMicrosecondType, _, TimestampMicrosecondType>(
+ array,
+ |ts_us| microseconds_add(ts_us, scalar, sign).unwrap(),
+ ),
+ ) as ArrayRef
+ }
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap();
+ Arc::new(
+ unary::<TimestampNanosecondType, _, TimestampNanosecondType>(
+ array,
+ |ts_ns| nanoseconds_add(ts_ns, scalar, sign).unwrap(),
+ ),
+ ) as ArrayRef
+ }
+ _ => Err(DataFusionError::Execution(format!(
+ "Invalid lhs type for DateIntervalExpr: {}",
+ array.data_type()
+ )))?,
+ };
+ Ok(ColumnarValue::Array(ret))
+}
+
+fn evaluate_scalar(
+ operand: ScalarValue,
+ sign: i32,
+ scalar: &ScalarValue,
+) -> Result<ColumnarValue> {
+ let res = match operand {
+ ScalarValue::Date32(Some(days)) => {
+ let value = date32_add(days, scalar, sign)?;
+ ColumnarValue::Scalar(ScalarValue::Date32(Some(value)))
+ }
+ ScalarValue::Date64(Some(ms)) => {
+ let value = date64_add(ms, scalar, sign)?;
+ ColumnarValue::Scalar(ScalarValue::Date64(Some(value)))
+ }
+ ScalarValue::TimestampSecond(Some(ts_s), zone) => {
+ let value = seconds_add(ts_s, scalar, sign)?;
+ ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(value),
zone))
+ }
+ ScalarValue::TimestampMillisecond(Some(ts_ms), zone) => {
+ let value = milliseconds_add(ts_ms, scalar, sign)?;
+
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(value), zone))
+ }
+ ScalarValue::TimestampMicrosecond(Some(ts_us), zone) => {
+ let value = microseconds_add(ts_us, scalar, sign)?;
+
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(value), zone))
+ }
+ ScalarValue::TimestampNanosecond(Some(ts_ns), zone) => {
+ let value = nanoseconds_add(ts_ns, scalar, sign)?;
+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(value), zone))
+ }
+ _ => Err(DataFusionError::Execution(format!(
+ "Invalid lhs type {} for DateIntervalExpr",
+ operand.get_datatype()
+ )))?,
+ };
+ Ok(res)
+}
+
+#[inline]
+fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
+ let epoch = NaiveDate::from_ymd(1970, 1, 1);
+ let prior = epoch.add(Duration::days(days as i64));
+ let posterior = do_date_math(prior, scalar, sign)?;
+ Ok(posterior.sub(epoch).num_days() as i32)
+}
+
+#[inline]
+fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+ let epoch = NaiveDate::from_ymd(1970, 1, 1);
+ let prior = epoch.add(Duration::milliseconds(ms));
+ let posterior = do_date_math(prior, scalar, sign)?;
+ Ok(posterior.sub(epoch).num_milliseconds())
+}
+
+#[inline]
+fn seconds_add(ts_s: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+ Ok(do_data_time_math(ts_s, 0, scalar, sign)?.timestamp())
Review Comment:
should this be `do_date_time_math()` instead of `do_data_time_math()`?
##########
datafusion/physical-expr/src/expressions/datetime.rs:
##########
@@ -112,71 +120,166 @@ impl PhysicalExpr for DateIntervalExpr {
}
};
- // Unwrap days since epoch
- let operand = match dates {
- ColumnarValue::Scalar(scalar) => scalar,
- _ => Err(DataFusionError::Execution(
- "Columnar execution is not yet supported for DateIntervalExpr"
- .to_string(),
- ))?,
- };
-
- let res = match operand {
- ScalarValue::Date32(Some(d)) => {
- let epoch = NaiveDate::from_ymd(1970, 1, 1);
- let prior = epoch.add(Duration::days(d as i64));
- let posterior = do_date_math(prior, scalar, sign)?;
- let days = posterior.sub(epoch).num_days() as i32;
- ColumnarValue::Scalar(ScalarValue::Date32(Some(days)))
- }
- ScalarValue::Date64(Some(ms)) => {
- let epoch = NaiveDate::from_ymd(1970, 1, 1);
- let prior = epoch.add(Duration::milliseconds(ms));
- let posterior = do_date_math(prior, scalar, sign)?;
- let ms = posterior.sub(epoch).num_milliseconds();
- ColumnarValue::Scalar(ScalarValue::Date64(Some(ms)))
- }
- ScalarValue::TimestampSecond(Some(ts_s), zone) => {
- let value = do_data_time_math(ts_s, 0, scalar,
sign)?.timestamp();
-
ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(value), zone))
- }
- ScalarValue::TimestampMillisecond(Some(ts_ms), zone) => {
- let secs = ts_ms / 1000;
- let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
- let value =
- do_data_time_math(secs, nsecs, scalar,
sign)?.timestamp_millis();
- ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
- Some(value),
- zone,
- ))
- }
- ScalarValue::TimestampMicrosecond(Some(ts_us), zone) => {
- let secs = ts_us / 1_000_000;
- let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
- let value = do_data_time_math(secs, nsecs, scalar, sign)?
- .timestamp_nanos()
- / 1000;
- ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
- Some(value),
- zone,
- ))
- }
- ScalarValue::TimestampNanosecond(Some(ts_ns), zone) => {
- let secs = ts_ns / 1_000_000_000;
- let nsecs = (ts_ns % 1_000_000_000) as u32;
- let value =
- do_data_time_math(secs, nsecs, scalar,
sign)?.timestamp_nanos();
-
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(value), zone))
- }
- _ => Err(DataFusionError::Execution(format!(
- "Invalid lhs type for DateIntervalExpr: {}",
- scalar
- )))?,
- };
- Ok(res)
+ match dates {
+ ColumnarValue::Scalar(operand) => evaluate_scalar(operand, sign,
intervals),
+ ColumnarValue::Array(array) => evaluate_array(array, sign,
intervals),
Review Comment:
Nice!
##########
datafusion/physical-expr/src/expressions/datetime.rs:
##########
@@ -112,71 +120,166 @@ impl PhysicalExpr for DateIntervalExpr {
}
};
- // Unwrap days since epoch
- let operand = match dates {
- ColumnarValue::Scalar(scalar) => scalar,
- _ => Err(DataFusionError::Execution(
- "Columnar execution is not yet supported for DateIntervalExpr"
- .to_string(),
- ))?,
- };
-
- let res = match operand {
- ScalarValue::Date32(Some(d)) => {
- let epoch = NaiveDate::from_ymd(1970, 1, 1);
- let prior = epoch.add(Duration::days(d as i64));
- let posterior = do_date_math(prior, scalar, sign)?;
- let days = posterior.sub(epoch).num_days() as i32;
- ColumnarValue::Scalar(ScalarValue::Date32(Some(days)))
- }
- ScalarValue::Date64(Some(ms)) => {
- let epoch = NaiveDate::from_ymd(1970, 1, 1);
- let prior = epoch.add(Duration::milliseconds(ms));
- let posterior = do_date_math(prior, scalar, sign)?;
- let ms = posterior.sub(epoch).num_milliseconds();
- ColumnarValue::Scalar(ScalarValue::Date64(Some(ms)))
- }
- ScalarValue::TimestampSecond(Some(ts_s), zone) => {
- let value = do_data_time_math(ts_s, 0, scalar,
sign)?.timestamp();
-
ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(value), zone))
- }
- ScalarValue::TimestampMillisecond(Some(ts_ms), zone) => {
- let secs = ts_ms / 1000;
- let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
- let value =
- do_data_time_math(secs, nsecs, scalar,
sign)?.timestamp_millis();
- ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
- Some(value),
- zone,
- ))
- }
- ScalarValue::TimestampMicrosecond(Some(ts_us), zone) => {
- let secs = ts_us / 1_000_000;
- let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
- let value = do_data_time_math(secs, nsecs, scalar, sign)?
- .timestamp_nanos()
- / 1000;
- ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
- Some(value),
- zone,
- ))
- }
- ScalarValue::TimestampNanosecond(Some(ts_ns), zone) => {
- let secs = ts_ns / 1_000_000_000;
- let nsecs = (ts_ns % 1_000_000_000) as u32;
- let value =
- do_data_time_math(secs, nsecs, scalar,
sign)?.timestamp_nanos();
-
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(value), zone))
- }
- _ => Err(DataFusionError::Execution(format!(
- "Invalid lhs type for DateIntervalExpr: {}",
- scalar
- )))?,
- };
- Ok(res)
+ match dates {
+ ColumnarValue::Scalar(operand) => evaluate_scalar(operand, sign,
intervals),
+ ColumnarValue::Array(array) => evaluate_array(array, sign,
intervals),
+ }
}
}
+pub fn evaluate_array(
+ array: ArrayRef,
+ sign: i32,
+ scalar: &ScalarValue,
+) -> Result<ColumnarValue> {
+ let ret = match array.data_type() {
+ DataType::Date32 => {
+ let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
Review Comment:
I know this shouldn't panic, but since this is returning a result anyway,
would it improve auditability to use the error propagation operator here
instead?
##########
datafusion/core/tests/sql/timestamp.rs:
##########
@@ -1256,3 +1257,141 @@ async fn date_bin() {
"Arrow error: External error: This feature is not implemented:
DATE_BIN only supports literal values for the origin argument, not arrays"
);
}
+
+#[tokio::test]
+async fn timestamp_add_interval_second() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "SELECT NOW(), NOW() + INTERVAL '1' SECOND;";
Review Comment:
Will now be reliable when used this way? Is there a race / intermittent test
case here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]