alamb commented on code in PR #3916:
URL: https://github.com/apache/arrow-datafusion/pull/3916#discussion_r1002092173


##########
datafusion/common/src/scalar.rs:
##########
@@ -476,35 +480,144 @@ macro_rules! impl_op {
             (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
                 primitive_op!(lhs, rhs, Int8, $OPERATION)
             }
-            _ => {
-                impl_distinct_cases_op!($LHS, $RHS, $OPERATION)
+            // Binary operations on arguments with different types:
+            (ScalarValue::Date32(Some(days)), _) => {
+                let value = date32_add(*days, $RHS, get_sign!($OPERATION))?;

Review Comment:
   since this is a generic macro I am surprised to see `date32_add` -- this 
would likely be surprising to someone if they tried to use this macro for `mul` 
or `div` I think



##########
datafusion/common/src/scalar.rs:
##########
@@ -476,35 +480,144 @@ macro_rules! impl_op {
             (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
                 primitive_op!(lhs, rhs, Int8, $OPERATION)
             }
-            _ => {
-                impl_distinct_cases_op!($LHS, $RHS, $OPERATION)
+            // Binary operations on arguments with different types:
+            (ScalarValue::Date32(Some(days)), _) => {
+                let value = date32_add(*days, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::Date32(Some(value)))
+            }
+            (ScalarValue::Date64(Some(ms)), _) => {
+                let value = date64_add(*ms, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::Date64(Some(value)))
+            }
+            (ScalarValue::TimestampSecond(Some(ts_s), zone), _) => {
+                let value = seconds_add(*ts_s, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampSecond(Some(value), zone.clone()))
+            }
+            (ScalarValue::TimestampMillisecond(Some(ts_ms), zone), _) => {
+                let value = milliseconds_add(*ts_ms, $RHS, 
get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampMillisecond(Some(value), 
zone.clone()))
             }
+            (ScalarValue::TimestampMicrosecond(Some(ts_us), zone), _) => {
+                let value = microseconds_add(*ts_us, $RHS, 
get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampMicrosecond(Some(value), 
zone.clone()))
+            }
+            (ScalarValue::TimestampNanosecond(Some(ts_ns), zone), _) => {
+                let value = nanoseconds_add(*ts_ns, $RHS, 
get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampNanosecond(Some(value), zone.clone()))
+            }
+            _ => Err(DataFusionError::Internal(format!(
+                "Operator {} is not implemented for types {:?} and {:?}",
+                stringify!($OPERATION),
+                $LHS,
+                $RHS
+            ))),
         }
     };
 }
 
-// If we want a special implementation for an operation this is the place to 
implement it.
-// For instance, in the future we may want to implement subtraction for dates 
but not addition.
-// We can implement such special cases here.
-macro_rules! impl_distinct_cases_op {
-    ($LHS:expr, $RHS:expr, +) => {
-        match ($LHS, $RHS) {
-            e => Err(DataFusionError::Internal(format!(
-                "Addition is not implemented for {:?}",
-                e
-            ))),
-        }
+macro_rules! get_sign {
+    (+) => {
+        1
     };
-    ($LHS:expr, $RHS:expr, -) => {
-        match ($LHS, $RHS) {
-            e => Err(DataFusionError::Internal(format!(
-                "Subtraction is not implemented for {:?}",
-                e
-            ))),
-        }
+    (-) => {
+        -1
     };
 }
 
+#[inline]
+pub fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {

Review Comment:
   In general I would like to consolidate date / time arithmetic into a single 
location, and ideally that location is arrow-rs.
   
   Thus I think we should be using the date math functions in arrow-rs -- 
specifically 
https://docs.rs/arrow/25.0.0/arrow/datatypes/struct.Date32Type.html 
   
   I see that this code is simply moved in this PR, but in general what do you 
think?



##########
datafusion/common/src/test_util.rs:
##########
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   👍 



##########
datafusion/physical-expr/src/window/aggregate.rs:
##########
@@ -163,19 +160,23 @@ fn calculate_index_of_row<const BISECT_SIDE: bool, const 
SEARCH_SIDE: bool>(
             .map(|value| {
                 if value.is_null() {
                     return Ok(value.clone());
-                };
-                let offset = 
ScalarValue::try_from_value(&value.get_datatype(), delta)?;
+                }
                 if SEARCH_SIDE == is_descending {
                     // TODO: Handle positive overflows
-                    value.add(&offset)
-                } else if value.is_unsigned() && value < &offset {
-                    ScalarValue::try_from_value(&value.get_datatype(), 0)
+                    value.add(delta)

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -1367,6 +1367,69 @@ fn get_physical_expr_pair(
     let physical_name = physical_name(expr)?;
     Ok((physical_expr, physical_name))
 }
+/// Casts the ScalarValue `value` to column type once we have schema 
information
+/// The resulting type is not necessarily same type with the `column_type`. 
For instance
+/// if `column_type` is Timestamp the result is casted to Interval type. The 
reason is that
+/// Operation between Timestamps is not meaningful, However operation between 
Timestamp and
+/// Interval is valid. For basic types `column_type` is indeed the resulting 
type.
+fn convert_to_column_type(

Review Comment:
   This operation is typically called "coercion" and is handled in datafusion 
for other expression types here: 
https://github.com/apache/arrow-datafusion/blob/master/datafusion/expr/src/type_coercion.rs#L18-L32
 (and submodule).
   
   Did you consider doing this conversion as part of coercion?
   
   One reason to do it as part of the normal coercion is that then the proper 
types will be present for operations such as constant folding / constant 
propagation. This might allow for expressions like
   
   ```sql
                   COUNT(*) OVER (ORDER BY ts RANGE BETWEEN INTERVAL '1' DAY + 
INTERVAL '1' DAY PRECEDING AND INTERVAL '3 DAY' FOLLOWING),
   ```
   
   Eventually
   
   



##########
datafusion/expr/src/window_frame.rs:
##########
@@ -110,16 +104,15 @@ impl Default for WindowFrame {
 /// 4. <expr> FOLLOWING
 /// 5. UNBOUNDED FOLLOWING
 ///
-/// in this implementation we'll only allow <expr> to be u64 (i.e. no dynamic 
boundary)
-#[derive(Debug, Clone, Copy, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]

Review Comment:
   👍 



##########
datafusion/expr/src/window_frame.rs:
##########
@@ -132,70 +125,72 @@ pub enum WindowFrameBound {
     ///
     /// 5. UNBOUNDED FOLLOWING
     /// The frame boundary is the last row in the partition.
-    Following(Option<u64>),
+    Following(ScalarValue),
 }
 
-impl From<ast::WindowFrameBound> for WindowFrameBound {
-    fn from(value: ast::WindowFrameBound) -> Self {
-        match value {
-            ast::WindowFrameBound::Preceding(v) => Self::Preceding(v),
-            ast::WindowFrameBound::Following(v) => Self::Following(v),
+impl TryFrom<ast::WindowFrameBound> for WindowFrameBound {
+    type Error = DataFusionError;
+
+    fn try_from(value: ast::WindowFrameBound) -> Result<Self> {
+        Ok(match value {
+            ast::WindowFrameBound::Preceding(Some(v)) => {
+                Self::Preceding(convert_frame_bound_to_scalar_value(*v)?)
+            }
+            ast::WindowFrameBound::Preceding(None) => {
+                Self::Preceding(ScalarValue::Utf8(None))
+            }
+            ast::WindowFrameBound::Following(Some(v)) => {
+                Self::Following(convert_frame_bound_to_scalar_value(*v)?)
+            }
+            ast::WindowFrameBound::Following(None) => {
+                Self::Following(ScalarValue::Utf8(None))
+            }
             ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
-        }
+        })
     }
 }
 
-impl fmt::Display for WindowFrameBound {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        match self {
-            WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
-            WindowFrameBound::Preceding(None) => f.write_str("UNBOUNDED 
PRECEDING"),
-            WindowFrameBound::Following(None) => f.write_str("UNBOUNDED 
FOLLOWING"),
-            WindowFrameBound::Preceding(Some(n)) => write!(f, "{} PRECEDING", 
n),
-            WindowFrameBound::Following(Some(n)) => write!(f, "{} FOLLOWING", 
n),
+pub fn convert_frame_bound_to_scalar_value(v: ast::Expr) -> 
Result<ScalarValue> {
+    Ok(ScalarValue::Utf8(Some(match v {
+        ast::Expr::Value(ast::Value::Number(value, false))
+        | ast::Expr::Value(ast::Value::SingleQuotedString(value)) => value,
+        ast::Expr::Interval {
+            value,
+            leading_field,
+            ..
+        } => {
+            let result = match *value {
+                ast::Expr::Value(ast::Value::SingleQuotedString(item)) => item,
+                e => {
+                    let msg = format!("INTERVAL expression cannot be {:?}", e);
+                    return Err(DataFusionError::Internal(msg));

Review Comment:
   Should this error be ParserError, or unsupported (rather than internal)? 



##########
datafusion/common/src/scalar.rs:
##########
@@ -476,35 +480,144 @@ macro_rules! impl_op {
             (ScalarValue::Int8(lhs), ScalarValue::Int8(rhs)) => {
                 primitive_op!(lhs, rhs, Int8, $OPERATION)
             }
-            _ => {
-                impl_distinct_cases_op!($LHS, $RHS, $OPERATION)
+            // Binary operations on arguments with different types:
+            (ScalarValue::Date32(Some(days)), _) => {
+                let value = date32_add(*days, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::Date32(Some(value)))
+            }
+            (ScalarValue::Date64(Some(ms)), _) => {
+                let value = date64_add(*ms, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::Date64(Some(value)))
+            }
+            (ScalarValue::TimestampSecond(Some(ts_s), zone), _) => {
+                let value = seconds_add(*ts_s, $RHS, get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampSecond(Some(value), zone.clone()))
+            }
+            (ScalarValue::TimestampMillisecond(Some(ts_ms), zone), _) => {
+                let value = milliseconds_add(*ts_ms, $RHS, 
get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampMillisecond(Some(value), 
zone.clone()))
             }
+            (ScalarValue::TimestampMicrosecond(Some(ts_us), zone), _) => {
+                let value = microseconds_add(*ts_us, $RHS, 
get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampMicrosecond(Some(value), 
zone.clone()))
+            }
+            (ScalarValue::TimestampNanosecond(Some(ts_ns), zone), _) => {
+                let value = nanoseconds_add(*ts_ns, $RHS, 
get_sign!($OPERATION))?;
+                Ok(ScalarValue::TimestampNanosecond(Some(value), zone.clone()))
+            }
+            _ => Err(DataFusionError::Internal(format!(
+                "Operator {} is not implemented for types {:?} and {:?}",
+                stringify!($OPERATION),
+                $LHS,
+                $RHS
+            ))),
         }
     };
 }
 
-// If we want a special implementation for an operation this is the place to 
implement it.
-// For instance, in the future we may want to implement subtraction for dates 
but not addition.
-// We can implement such special cases here.
-macro_rules! impl_distinct_cases_op {
-    ($LHS:expr, $RHS:expr, +) => {
-        match ($LHS, $RHS) {
-            e => Err(DataFusionError::Internal(format!(
-                "Addition is not implemented for {:?}",
-                e
-            ))),
-        }
+macro_rules! get_sign {
+    (+) => {
+        1
     };
-    ($LHS:expr, $RHS:expr, -) => {
-        match ($LHS, $RHS) {
-            e => Err(DataFusionError::Internal(format!(
-                "Subtraction is not implemented for {:?}",
-                e
-            ))),
-        }
+    (-) => {
+        -1
     };
 }
 
+#[inline]
+pub fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::days(days as i64));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_days() as i32)
+}
+
+#[inline]
+pub fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::milliseconds(ms));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_milliseconds())
+}
+
+#[inline]
+pub fn seconds_add(ts_s: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    Ok(do_date_time_math(ts_s, 0, scalar, sign)?.timestamp())
+}
+
+#[inline]
+pub fn milliseconds_add(ts_ms: i64, scalar: &ScalarValue, sign: i32) -> 
Result<i64> {
+    let secs = ts_ms / 1000;
+    let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
+    Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_millis())
+}
+
+#[inline]
+pub fn microseconds_add(ts_us: i64, scalar: &ScalarValue, sign: i32) -> 
Result<i64> {
+    let secs = ts_us / 1_000_000;
+    let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
+    Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos() / 1000)
+}
+
+#[inline]
+pub fn nanoseconds_add(ts_ns: i64, scalar: &ScalarValue, sign: i32) -> 
Result<i64> {
+    let secs = ts_ns / 1_000_000_000;
+    let nsecs = (ts_ns % 1_000_000_000) as u32;
+    Ok(do_date_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos())
+}
+
+#[inline]
+fn do_date_time_math(
+    secs: i64,
+    nsecs: u32,
+    scalar: &ScalarValue,
+    sign: i32,
+) -> Result<NaiveDateTime> {
+    let prior = NaiveDateTime::from_timestamp(secs, nsecs);
+    do_date_math(prior, scalar, sign)
+}
+
+fn do_date_math<D>(prior: D, scalar: &ScalarValue, sign: i32) -> Result<D>
+where
+    D: Datelike + Add<Duration, Output = D>,
+{
+    Ok(match scalar {
+        ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
+        ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * 
sign),
+        ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, 
sign),
+        other => Err(DataFusionError::Execution(format!(
+            "DateIntervalExpr does not support non-interval type {:?}",
+            other
+        )))?,
+    })
+}
+
+// Can remove once https://github.com/apache/arrow-rs/pull/2031 is released

Review Comment:
   This has been released



##########
datafusion/expr/src/window_frame.rs:
##########
@@ -252,103 +247,32 @@ mod tests {
         };
         let result = WindowFrame::try_from(window_frame);
         assert_eq!(
-      result.err().unwrap().to_string(),
-      "Execution error: Invalid window frame: start bound cannot be unbounded 
following"
-        .to_owned()
-    );
+            result.err().unwrap().to_string(),
+            "Execution error: Invalid window frame: start bound cannot be 
unbounded following".to_owned()
+        );

Review Comment:
   You may be able to make this cleaner via 
https://doc.rust-lang.org/std/result/enum.Result.html#method.expect_err
   
   
   ```suggestion
           let err = WindowFrame::try_from(window_frame).unwrap_err();
           assert_eq!(
               err.to_string(),
               "Execution error: Invalid window frame: start bound cannot be 
unbounded following".to_owned()
           );```



##########
datafusion/common/src/delta.rs:
##########
@@ -49,7 +49,7 @@ fn normalise_day(year: i32, month: u32, day: u32) -> u32 {
 
 /// Shift a date by the given number of months.
 /// Ambiguous month-ends are shifted backwards as necessary.
-pub(crate) fn shift_months<D: Datelike>(date: D, months: i32) -> D {
+pub fn shift_months<D: Datelike>(date: D, months: i32) -> D {

Review Comment:
   Moving these to common makes sense -- eventually I hope to move all this 
code into arrow-rs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to