alamb commented on code in PR #3916: URL: https://github.com/apache/arrow-datafusion/pull/3916#discussion_r1002092173
########## datafusion/common/src/scalar.rs: ########## @@ -476,35 +480,144 @@ macro_rules! impl_op { (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => { primitive_op!(lhs, rhs, Int8, $OPERATION) } - _ => { - impl_distinct_cases_op!($LHS, $RHS, $OPERATION) + // Binary operations on arguments with different types: + (ScalarValue::Date32(Some(days)), _) => { + let value = date32_add(*days, $RHS, get_sign!($OPERATION))?; Review Comment: since this is a generic macro I am surprised to see `date32_add` -- this would likely be surprising to someone if they tried to use this macro for `mul` or `div` I think ########## datafusion/common/src/scalar.rs: ########## @@ -476,35 +480,144 @@ macro_rules! impl_op { (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => { primitive_op!(lhs, rhs, Int8, $OPERATION) } - _ => { - impl_distinct_cases_op!($LHS, $RHS, $OPERATION) + // Binary operations on arguments with different types: + (ScalarValue::Date32(Some(days)), _) => { + let value = date32_add(*days, $RHS, get_sign!($OPERATION))?; + Ok(ScalarValue::Date32(Some(value))) + } + (ScalarValue::Date64(Some(ms)), _) => { + let value = date64_add(*ms, $RHS, get_sign!($OPERATION))?; + Ok(ScalarValue::Date64(Some(value))) + } + (ScalarValue::TimestampSecond(Some(ts_s), zone), _) => { + let value = seconds_add(*ts_s, $RHS, get_sign!($OPERATION))?; + Ok(ScalarValue::TimestampSecond(Some(value), zone.clone())) + } + (ScalarValue::TimestampMillisecond(Some(ts_ms), zone), _) => { + let value = milliseconds_add(*ts_ms, $RHS, get_sign!($OPERATION))?; + Ok(ScalarValue::TimestampMillisecond(Some(value), zone.clone())) } + (ScalarValue::TimestampMicrosecond(Some(ts_us), zone), _) => { + let value = microseconds_add(*ts_us, $RHS, get_sign!($OPERATION))?; + Ok(ScalarValue::TimestampMicrosecond(Some(value), zone.clone())) + } + (ScalarValue::TimestampNanosecond(Some(ts_ns), zone), _) => { + let value = nanoseconds_add(*ts_ns, $RHS, get_sign!($OPERATION))?; + Ok(ScalarValue::TimestampNanosecond(Some(value), zone.clone())) + } + _ => Err(DataFusionError::Internal(format!( + "Operator {} is not implemented for types {:?} and {:?}", + stringify!($OPERATION), + $LHS, + $RHS + ))), } }; } -// If we want a special implementation for an operation this is the place to implement it. -// For instance, in the future we may want to implement subtraction for dates but not addition. -// We can implement such special cases here. -macro_rules! impl_distinct_cases_op { - ($LHS:expr, $RHS:expr, +) => { - match ($LHS, $RHS) { - e => Err(DataFusionError::Internal(format!( - "Addition is not implemented for {:?}", - e - ))), - } +macro_rules! get_sign { + (+) => { + 1 }; - ($LHS:expr, $RHS:expr, -) => { - match ($LHS, $RHS) { - e => Err(DataFusionError::Internal(format!( - "Subtraction is not implemented for {:?}", - e - ))), - } + (-) => { + -1 }; } +#[inline] +pub fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> { Review Comment: In general I would like to consolidate date / time arithmetic into a single location, and ideally that location is arrow-rs. Thus I think we should be using the date math functions in arrow-rs -- specifically https://docs.rs/arrow/25.0.0/arrow/datatypes/struct.Date32Type.html I see that this code is simply moved in this PR, but in general what do you think? ########## datafusion/common/src/test_util.rs: ########## @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one Review Comment: 👍 ########## datafusion/physical-expr/src/window/aggregate.rs: ########## @@ -163,19 +160,23 @@ fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>( .map(|value| { if value.is_null() { return Ok(value.clone()); - }; - let offset = ScalarValue::try_from_value(&value.get_datatype(), delta)?; + } if SEARCH_SIDE == is_descending { // TODO: Handle positive overflows - value.add(&offset) - } else if value.is_unsigned() && value < &offset { - ScalarValue::try_from_value(&value.get_datatype(), 0) + value.add(delta) Review Comment: 👍 ########## datafusion/core/src/physical_plan/planner.rs: ########## @@ -1367,6 +1367,69 @@ fn get_physical_expr_pair( let physical_name = physical_name(expr)?; Ok((physical_expr, physical_name)) } +/// Casts the ScalarValue `value` to column type once we have schema information +/// The resulting type is not necessarily same type with the `column_type`. For instance +/// if `column_type` is Timestamp the result is casted to Interval type. The reason is that +/// Operation between Timestamps is not meaningful, However operation between Timestamp and +/// Interval is valid. For basic types `column_type` is indeed the resulting type. +fn convert_to_column_type( Review Comment: This operation is typically called "coercion" and is handled in datafusion for other expression types here: https://github.com/apache/arrow-datafusion/blob/master/datafusion/expr/src/type_coercion.rs#L18-L32 (and submodule). Did you consider doing this conversion as part of coercion? One reason to do it as part of the normal coercion is that then the proper types will be present for operations such as constant folding / constant propagation. This might allow for expressions like ```sql COUNT(*) OVER (ORDER BY ts RANGE BETWEEN INTERVAL '1' DAY + INTERVAL '1' DAY PRECEDING AND INTERVAL '3 DAY' FOLLOWING), ``` Eventually ########## datafusion/expr/src/window_frame.rs: ########## @@ -110,16 +104,15 @@ impl Default for WindowFrame { /// 4. <expr> FOLLOWING /// 5. UNBOUNDED FOLLOWING /// -/// in this implementation we'll only allow <expr> to be u64 (i.e. no dynamic boundary) -#[derive(Debug, Clone, Copy, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] Review Comment: 👍 ########## datafusion/expr/src/window_frame.rs: ########## @@ -132,70 +125,72 @@ pub enum WindowFrameBound { /// /// 5. UNBOUNDED FOLLOWING /// The frame boundary is the last row in the partition. - Following(Option<u64>), + Following(ScalarValue), } -impl From<ast::WindowFrameBound> for WindowFrameBound { - fn from(value: ast::WindowFrameBound) -> Self { - match value { - ast::WindowFrameBound::Preceding(v) => Self::Preceding(v), - ast::WindowFrameBound::Following(v) => Self::Following(v), +impl TryFrom<ast::WindowFrameBound> for WindowFrameBound { + type Error = DataFusionError; + + fn try_from(value: ast::WindowFrameBound) -> Result<Self> { + Ok(match value { + ast::WindowFrameBound::Preceding(Some(v)) => { + Self::Preceding(convert_frame_bound_to_scalar_value(*v)?) + } + ast::WindowFrameBound::Preceding(None) => { + Self::Preceding(ScalarValue::Utf8(None)) + } + ast::WindowFrameBound::Following(Some(v)) => { + Self::Following(convert_frame_bound_to_scalar_value(*v)?) + } + ast::WindowFrameBound::Following(None) => { + Self::Following(ScalarValue::Utf8(None)) + } ast::WindowFrameBound::CurrentRow => Self::CurrentRow, - } + }) } } -impl fmt::Display for WindowFrameBound { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"), - WindowFrameBound::Preceding(None) => f.write_str("UNBOUNDED PRECEDING"), - WindowFrameBound::Following(None) => f.write_str("UNBOUNDED FOLLOWING"), - WindowFrameBound::Preceding(Some(n)) => write!(f, "{} PRECEDING", n), - WindowFrameBound::Following(Some(n)) => write!(f, "{} FOLLOWING", n), +pub fn convert_frame_bound_to_scalar_value(v: ast::Expr) -> Result<ScalarValue> { + Ok(ScalarValue::Utf8(Some(match v { + ast::Expr::Value(ast::Value::Number(value, false)) + | ast::Expr::Value(ast::Value::SingleQuotedString(value)) => value, + ast::Expr::Interval { + value, + leading_field, + .. + } => { + let result = match *value { + ast::Expr::Value(ast::Value::SingleQuotedString(item)) => item, + e => { + let msg = format!("INTERVAL expression cannot be {:?}", e); + return Err(DataFusionError::Internal(msg)); Review Comment: Should this error be ParserError, or unsupported (rather than internal)? ########## datafusion/common/src/scalar.rs: ########## @@ -476,35 +480,144 @@ macro_rules! impl_op { (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => { primitive_op!(lhs, rhs, Int8, $OPERATION) } - _ => { - impl_distinct_cases_op!($LHS, $RHS, $OPERATION) + // Binary operations on arguments with different types: + (ScalarValue::Date32(Some(days)), _) => { + let value = date32_add(*days, $RHS, get_sign!($OPERATION))?; + Ok(ScalarValue::Date32(Some(value))) + } + (ScalarValue::Date64(Some(ms)), _) => { + let value = date64_add(*ms, $RHS, get_sign!($OPERATION))?; + Ok(ScalarValue::Date64(Some(value))) + } + (ScalarValue::TimestampSecond(Some(ts_s), zone), _) => { + let value = seconds_add(*ts_s, $RHS, get_sign!($OPERATION))?; + Ok(ScalarValue::TimestampSecond(Some(value), zone.clone())) + } + (ScalarValue::TimestampMillisecond(Some(ts_ms), zone), _) => { + let value = milliseconds_add(*ts_ms, $RHS, get_sign!($OPERATION))?; + Ok(ScalarValue::TimestampMillisecond(Some(value), zone.clone())) } + (ScalarValue::TimestampMicrosecond(Some(ts_us), zone), _) => { + let value = microseconds_add(*ts_us, $RHS, get_sign!($OPERATION))?; + Ok(ScalarValue::TimestampMicrosecond(Some(value), zone.clone())) + } + (ScalarValue::TimestampNanosecond(Some(ts_ns), zone), _) => { + let value = nanoseconds_add(*ts_ns, $RHS, get_sign!($OPERATION))?; + Ok(ScalarValue::TimestampNanosecond(Some(value), zone.clone())) + } + _ => Err(DataFusionError::Internal(format!( + "Operator {} is not implemented for types {:?} and {:?}", + stringify!($OPERATION), + $LHS, + $RHS + ))), } }; } -// If we want a special implementation for an operation this is the place to implement it. -// For instance, in the future we may want to implement subtraction for dates but not addition. -// We can implement such special cases here. -macro_rules! impl_distinct_cases_op { - ($LHS:expr, $RHS:expr, +) => { - match ($LHS, $RHS) { - e => Err(DataFusionError::Internal(format!( - "Addition is not implemented for {:?}", - e - ))), - } +macro_rules! get_sign { + (+) => { + 1 }; - ($LHS:expr, $RHS:expr, -) => { - match ($LHS, $RHS) { - e => Err(DataFusionError::Internal(format!( - "Subtraction is not implemented for {:?}", - e - ))), - } + (-) => { + -1 }; } +#[inline] +pub fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> { + let epoch = NaiveDate::from_ymd(1970, 1, 1); + let prior = epoch.add(Duration::days(days as i64)); + let posterior = do_date_math(prior, scalar, sign)?; + Ok(posterior.sub(epoch).num_days() as i32) +} + +#[inline] +pub fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> { + let epoch = NaiveDate::from_ymd(1970, 1, 1); + let prior = epoch.add(Duration::milliseconds(ms)); + let posterior = do_date_math(prior, scalar, sign)?; + Ok(posterior.sub(epoch).num_milliseconds()) +} + +#[inline] +pub fn seconds_add(ts_s: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> { + Ok(do_date_time_math(ts_s, 0, scalar, sign)?.timestamp()) +} + +#[inline] +pub fn milliseconds_add(ts_ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> { + let secs = ts_ms / 1000; + let nsecs = ((ts_ms % 1000) * 1_000_000) as u32; + Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_millis()) +} + +#[inline] +pub fn microseconds_add(ts_us: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> { + let secs = ts_us / 1_000_000; + let nsecs = ((ts_us % 1_000_000) * 1000) as u32; + Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos() / 1000) +} + +#[inline] +pub fn nanoseconds_add(ts_ns: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> { + let secs = ts_ns / 1_000_000_000; + let nsecs = (ts_ns % 1_000_000_000) as u32; + Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos()) +} + +#[inline] +fn do_date_time_math( + secs: i64, + nsecs: u32, + scalar: &ScalarValue, + sign: i32, +) -> Result<NaiveDateTime> { + let prior = NaiveDateTime::from_timestamp(secs, nsecs); + do_date_math(prior, scalar, sign) +} + +fn do_date_math<D>(prior: D, scalar: &ScalarValue, sign: i32) -> Result<D> +where + D: Datelike + Add<Duration, Output = D>, +{ + Ok(match scalar { + ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign), + ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * sign), + ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign), + other => Err(DataFusionError::Execution(format!( + "DateIntervalExpr does not support non-interval type {:?}", + other + )))?, + }) +} + +// Can remove once https://github.com/apache/arrow-rs/pull/2031 is released Review Comment: This has been released ########## datafusion/expr/src/window_frame.rs: ########## @@ -252,103 +247,32 @@ mod tests { }; let result = WindowFrame::try_from(window_frame); assert_eq!( - result.err().unwrap().to_string(), - "Execution error: Invalid window frame: start bound cannot be unbounded following" - .to_owned() - ); + result.err().unwrap().to_string(), + "Execution error: Invalid window frame: start bound cannot be unbounded following".to_owned() + ); Review Comment: You may be able to make this cleaner via https://doc.rust-lang.org/std/result/enum.Result.html#method.expect_err ```suggestion let err = WindowFrame::try_from(window_frame).unwrap_err(); assert_eq!( err.to_string(), "Execution error: Invalid window frame: start bound cannot be unbounded following".to_owned() );``` ########## datafusion/common/src/delta.rs: ########## @@ -49,7 +49,7 @@ fn normalise_day(year: i32, month: u32, day: u32) -> u32 { /// Shift a date by the given number of months. /// Ambiguous month-ends are shifted backwards as necessary. -pub(crate) fn shift_months<D: Datelike>(date: D, months: i32) -> D { +pub fn shift_months<D: Datelike>(date: D, months: i32) -> D { Review Comment: Moving these to common makes sense -- eventually I hope to move all this code into arrow-rs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org