alamb commented on code in PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916#discussion_r1002092173
##########
datafusion/common/src/scalar.rs:
##########
@@ -476,35 +480,144 @@ macro_rules! impl_op {
(ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
primitive_op!(lhs, rhs, Int8, $OPERATION)
}
- _ => {
- impl_distinct_cases_op!($LHS, $RHS, $OPERATION)
+ // Binary operations on arguments with different types:
+ (ScalarValue::Date32(Some(days)), _) => {
+ let value = date32_add(*days, $RHS, get_sign!($OPERATION))?;
Review Comment:
since this is a generic macro I am surprised to see `date32_add` -- this
would likely be surprising to someone if they tried to use this macro for `mul`
or `div` I think
##########
datafusion/common/src/scalar.rs:
##########
@@ -476,35 +480,144 @@ macro_rules! impl_op {
(ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
primitive_op!(lhs, rhs, Int8, $OPERATION)
}
- _ => {
- impl_distinct_cases_op!($LHS, $RHS, $OPERATION)
+ // Binary operations on arguments with different types:
+ (ScalarValue::Date32(Some(days)), _) => {
+ let value = date32_add(*days, $RHS, get_sign!($OPERATION))?;
+ Ok(ScalarValue::Date32(Some(value)))
+ }
+ (ScalarValue::Date64(Some(ms)), _) => {
+ let value = date64_add(*ms, $RHS, get_sign!($OPERATION))?;
+ Ok(ScalarValue::Date64(Some(value)))
+ }
+ (ScalarValue::TimestampSecond(Some(ts_s), zone), _) => {
+ let value = seconds_add(*ts_s, $RHS, get_sign!($OPERATION))?;
+ Ok(ScalarValue::TimestampSecond(Some(value), zone.clone()))
+ }
+ (ScalarValue::TimestampMillisecond(Some(ts_ms), zone), _) => {
+ let value = milliseconds_add(*ts_ms, $RHS,
get_sign!($OPERATION))?;
+ Ok(ScalarValue::TimestampMillisecond(Some(value),
zone.clone()))
}
+ (ScalarValue::TimestampMicrosecond(Some(ts_us), zone), _) => {
+ let value = microseconds_add(*ts_us, $RHS,
get_sign!($OPERATION))?;
+ Ok(ScalarValue::TimestampMicrosecond(Some(value),
zone.clone()))
+ }
+ (ScalarValue::TimestampNanosecond(Some(ts_ns), zone), _) => {
+ let value = nanoseconds_add(*ts_ns, $RHS,
get_sign!($OPERATION))?;
+ Ok(ScalarValue::TimestampNanosecond(Some(value), zone.clone()))
+ }
+ _ => Err(DataFusionError::Internal(format!(
+ "Operator {} is not implemented for types {:?} and {:?}",
+ stringify!($OPERATION),
+ $LHS,
+ $RHS
+ ))),
}
};
}
-// If we want a special implementation for an operation this is the place to
implement it.
-// For instance, in the future we may want to implement subtraction for dates
but not addition.
-// We can implement such special cases here.
-macro_rules! impl_distinct_cases_op {
- ($LHS:expr, $RHS:expr, +) => {
- match ($LHS, $RHS) {
- e => Err(DataFusionError::Internal(format!(
- "Addition is not implemented for {:?}",
- e
- ))),
- }
+macro_rules! get_sign {
+ (+) => {
+ 1
};
- ($LHS:expr, $RHS:expr, -) => {
- match ($LHS, $RHS) {
- e => Err(DataFusionError::Internal(format!(
- "Subtraction is not implemented for {:?}",
- e
- ))),
- }
+ (-) => {
+ -1
};
}
+#[inline]
+pub fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
Review Comment:
In general I would like to consolidate date / time arithmetic into a single
location, and ideally that location is arrow-rs.
Thus I think we should be using the date math functions in arrow-rs --
specifically
https://docs.rs/arrow/25.0.0/arrow/datatypes/struct.Date32Type.html
I see that this code is simply moved in this PR, but in general what do you
think?
##########
datafusion/common/src/test_util.rs:
##########
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
Review Comment:
👍
##########
datafusion/physical-expr/src/window/aggregate.rs:
##########
@@ -163,19 +160,23 @@ fn calculate_index_of_row<const BISECT_SIDE: bool, const
SEARCH_SIDE: bool>(
.map(|value| {
if value.is_null() {
return Ok(value.clone());
- };
- let offset =
ScalarValue::try_from_value(&value.get_datatype(), delta)?;
+ }
if SEARCH_SIDE == is_descending {
// TODO: Handle positive overflows
- value.add(&offset)
- } else if value.is_unsigned() && value < &offset {
- ScalarValue::try_from_value(&value.get_datatype(), 0)
+ value.add(delta)
Review Comment:
👍
##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -1367,6 +1367,69 @@ fn get_physical_expr_pair(
let physical_name = physical_name(expr)?;
Ok((physical_expr, physical_name))
}
+/// Casts the ScalarValue `value` to column type once we have schema
information
+/// The resulting type is not necessarily same type with the `column_type`.
For instance
+/// if `column_type` is Timestamp the result is casted to Interval type. The
reason is that
+/// Operation between Timestamps is not meaningful, However operation between
Timestamp and
+/// Interval is valid. For basic types `column_type` is indeed the resulting
type.
+fn convert_to_column_type(
Review Comment:
This operation is typically called "coercion" and is handled in datafusion
for other expression types here:
https://github.com/apache/arrow-datafusion/blob/master/datafusion/expr/src/type_coercion.rs#L18-L32
(and submodule).
Did you consider doing this conversion as part of coercion?
One reason to do it as part of the normal coercion is that then the proper
types will be present for operations such as constant folding / constant
propagation. This might allow for expressions like
```sql
COUNT(*) OVER (ORDER BY ts RANGE BETWEEN INTERVAL '1' DAY +
INTERVAL '1' DAY PRECEDING AND INTERVAL '3 DAY' FOLLOWING),
```
Eventually
##########
datafusion/expr/src/window_frame.rs:
##########
@@ -110,16 +104,15 @@ impl Default for WindowFrame {
/// 4. <expr> FOLLOWING
/// 5. UNBOUNDED FOLLOWING
///
-/// in this implementation we'll only allow <expr> to be u64 (i.e. no dynamic
boundary)
-#[derive(Debug, Clone, Copy, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Review Comment:
👍
##########
datafusion/expr/src/window_frame.rs:
##########
@@ -132,70 +125,72 @@ pub enum WindowFrameBound {
///
/// 5. UNBOUNDED FOLLOWING
/// The frame boundary is the last row in the partition.
- Following(Option<u64>),
+ Following(ScalarValue),
}
-impl From<ast::WindowFrameBound> for WindowFrameBound {
- fn from(value: ast::WindowFrameBound) -> Self {
- match value {
- ast::WindowFrameBound::Preceding(v) => Self::Preceding(v),
- ast::WindowFrameBound::Following(v) => Self::Following(v),
+impl TryFrom<ast::WindowFrameBound> for WindowFrameBound {
+ type Error = DataFusionError;
+
+ fn try_from(value: ast::WindowFrameBound) -> Result<Self> {
+ Ok(match value {
+ ast::WindowFrameBound::Preceding(Some(v)) => {
+ Self::Preceding(convert_frame_bound_to_scalar_value(*v)?)
+ }
+ ast::WindowFrameBound::Preceding(None) => {
+ Self::Preceding(ScalarValue::Utf8(None))
+ }
+ ast::WindowFrameBound::Following(Some(v)) => {
+ Self::Following(convert_frame_bound_to_scalar_value(*v)?)
+ }
+ ast::WindowFrameBound::Following(None) => {
+ Self::Following(ScalarValue::Utf8(None))
+ }
ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
- }
+ })
}
}
-impl fmt::Display for WindowFrameBound {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match self {
- WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
- WindowFrameBound::Preceding(None) => f.write_str("UNBOUNDED
PRECEDING"),
- WindowFrameBound::Following(None) => f.write_str("UNBOUNDED
FOLLOWING"),
- WindowFrameBound::Preceding(Some(n)) => write!(f, "{} PRECEDING",
n),
- WindowFrameBound::Following(Some(n)) => write!(f, "{} FOLLOWING",
n),
+pub fn convert_frame_bound_to_scalar_value(v: ast::Expr) ->
Result<ScalarValue> {
+ Ok(ScalarValue::Utf8(Some(match v {
+ ast::Expr::Value(ast::Value::Number(value, false))
+ | ast::Expr::Value(ast::Value::SingleQuotedString(value)) => value,
+ ast::Expr::Interval {
+ value,
+ leading_field,
+ ..
+ } => {
+ let result = match *value {
+ ast::Expr::Value(ast::Value::SingleQuotedString(item)) => item,
+ e => {
+ let msg = format!("INTERVAL expression cannot be {:?}", e);
+ return Err(DataFusionError::Internal(msg));
Review Comment:
Should this error be ParserError, or unsupported (rather than internal)?
##########
datafusion/common/src/scalar.rs:
##########
@@ -476,35 +480,144 @@ macro_rules! impl_op {
(ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
primitive_op!(lhs, rhs, Int8, $OPERATION)
}
- _ => {
- impl_distinct_cases_op!($LHS, $RHS, $OPERATION)
+ // Binary operations on arguments with different types:
+ (ScalarValue::Date32(Some(days)), _) => {
+ let value = date32_add(*days, $RHS, get_sign!($OPERATION))?;
+ Ok(ScalarValue::Date32(Some(value)))
+ }
+ (ScalarValue::Date64(Some(ms)), _) => {
+ let value = date64_add(*ms, $RHS, get_sign!($OPERATION))?;
+ Ok(ScalarValue::Date64(Some(value)))
+ }
+ (ScalarValue::TimestampSecond(Some(ts_s), zone), _) => {
+ let value = seconds_add(*ts_s, $RHS, get_sign!($OPERATION))?;
+ Ok(ScalarValue::TimestampSecond(Some(value), zone.clone()))
+ }
+ (ScalarValue::TimestampMillisecond(Some(ts_ms), zone), _) => {
+ let value = milliseconds_add(*ts_ms, $RHS,
get_sign!($OPERATION))?;
+ Ok(ScalarValue::TimestampMillisecond(Some(value),
zone.clone()))
}
+ (ScalarValue::TimestampMicrosecond(Some(ts_us), zone), _) => {
+ let value = microseconds_add(*ts_us, $RHS,
get_sign!($OPERATION))?;
+ Ok(ScalarValue::TimestampMicrosecond(Some(value),
zone.clone()))
+ }
+ (ScalarValue::TimestampNanosecond(Some(ts_ns), zone), _) => {
+ let value = nanoseconds_add(*ts_ns, $RHS,
get_sign!($OPERATION))?;
+ Ok(ScalarValue::TimestampNanosecond(Some(value), zone.clone()))
+ }
+ _ => Err(DataFusionError::Internal(format!(
+ "Operator {} is not implemented for types {:?} and {:?}",
+ stringify!($OPERATION),
+ $LHS,
+ $RHS
+ ))),
}
};
}
-// If we want a special implementation for an operation this is the place to
implement it.
-// For instance, in the future we may want to implement subtraction for dates
but not addition.
-// We can implement such special cases here.
-macro_rules! impl_distinct_cases_op {
- ($LHS:expr, $RHS:expr, +) => {
- match ($LHS, $RHS) {
- e => Err(DataFusionError::Internal(format!(
- "Addition is not implemented for {:?}",
- e
- ))),
- }
+macro_rules! get_sign {
+ (+) => {
+ 1
};
- ($LHS:expr, $RHS:expr, -) => {
- match ($LHS, $RHS) {
- e => Err(DataFusionError::Internal(format!(
- "Subtraction is not implemented for {:?}",
- e
- ))),
- }
+ (-) => {
+ -1
};
}
+#[inline]
+pub fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
+ let epoch = NaiveDate::from_ymd(1970, 1, 1);
+ let prior = epoch.add(Duration::days(days as i64));
+ let posterior = do_date_math(prior, scalar, sign)?;
+ Ok(posterior.sub(epoch).num_days() as i32)
+}
+
+#[inline]
+pub fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+ let epoch = NaiveDate::from_ymd(1970, 1, 1);
+ let prior = epoch.add(Duration::milliseconds(ms));
+ let posterior = do_date_math(prior, scalar, sign)?;
+ Ok(posterior.sub(epoch).num_milliseconds())
+}
+
+#[inline]
+pub fn seconds_add(ts_s: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+ Ok(do_date_time_math(ts_s, 0, scalar, sign)?.timestamp())
+}
+
+#[inline]
+pub fn milliseconds_add(ts_ms: i64, scalar: &ScalarValue, sign: i32) ->
Result<i64> {
+ let secs = ts_ms / 1000;
+ let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
+ Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_millis())
+}
+
+#[inline]
+pub fn microseconds_add(ts_us: i64, scalar: &ScalarValue, sign: i32) ->
Result<i64> {
+ let secs = ts_us / 1_000_000;
+ let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
+ Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos() / 1000)
+}
+
+#[inline]
+pub fn nanoseconds_add(ts_ns: i64, scalar: &ScalarValue, sign: i32) ->
Result<i64> {
+ let secs = ts_ns / 1_000_000_000;
+ let nsecs = (ts_ns % 1_000_000_000) as u32;
+ Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos())
+}
+
+#[inline]
+fn do_date_time_math(
+ secs: i64,
+ nsecs: u32,
+ scalar: &ScalarValue,
+ sign: i32,
+) -> Result<NaiveDateTime> {
+ let prior = NaiveDateTime::from_timestamp(secs, nsecs);
+ do_date_math(prior, scalar, sign)
+}
+
+fn do_date_math<D>(prior: D, scalar: &ScalarValue, sign: i32) -> Result<D>
+where
+ D: Datelike + Add<Duration, Output = D>,
+{
+ Ok(match scalar {
+ ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
+ ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i *
sign),
+ ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i,
sign),
+ other => Err(DataFusionError::Execution(format!(
+ "DateIntervalExpr does not support non-interval type {:?}",
+ other
+ )))?,
+ })
+}
+
+// Can remove once https://github.com/apache/arrow-rs/pull/2031 is released
Review Comment:
This has been released
##########
datafusion/expr/src/window_frame.rs:
##########
@@ -252,103 +247,32 @@ mod tests {
};
let result = WindowFrame::try_from(window_frame);
assert_eq!(
- result.err().unwrap().to_string(),
- "Execution error: Invalid window frame: start bound cannot be unbounded
following"
- .to_owned()
- );
+ result.err().unwrap().to_string(),
+ "Execution error: Invalid window frame: start bound cannot be
unbounded following".to_owned()
+ );
Review Comment:
You may be able to make this cleaner via
https://doc.rust-lang.org/std/result/enum.Result.html#method.expect_err
```suggestion
let err = WindowFrame::try_from(window_frame).unwrap_err();
assert_eq!(
err.to_string(),
"Execution error: Invalid window frame: start bound cannot be
unbounded following".to_owned()
);```
##########
datafusion/common/src/delta.rs:
##########
@@ -49,7 +49,7 @@ fn normalise_day(year: i32, month: u32, day: u32) -> u32 {
/// Shift a date by the given number of months.
/// Ambiguous month-ends are shifted backwards as necessary.
-pub(crate) fn shift_months<D: Datelike>(date: D, months: i32) -> D {
+pub fn shift_months<D: Datelike>(date: D, months: i32) -> D {
Review Comment:
Moving these to common makes sense -- eventually I hope to move all this
code into arrow-rs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]