alamb commented on code in PR #3110:
URL: https://github.com/apache/arrow-datafusion/pull/3110#discussion_r945968921


##########
datafusion/core/tests/sql/timestamp.rs:
##########
@@ -1256,3 +1257,141 @@ async fn date_bin() {
         "Arrow error: External error: This feature is not implemented: 
DATE_BIN only supports literal values for the origin argument, not arrays"
     );
 }
+

Review Comment:
   These are really nice tests -- thank you @JasonLi-cn 



##########
datafusion/physical-expr/src/expressions/datetime.rs:
##########
@@ -130,38 +120,196 @@ impl PhysicalExpr for DateIntervalExpr {
             }
         };
 
-        // Do math
-        let posterior = match scalar {
-            ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, 
sign),
-            ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i 
* sign),
-            ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, 
*i, sign),
-            other => Err(DataFusionError::Execution(format!(
-                "DateIntervalExpr does not support non-interval type {:?}",
-                other
-            )))?,
-        };
-
-        // convert back
-        let res = match operand {
-            ScalarValue::Date32(Some(_)) => {
-                let days = posterior.sub(epoch).num_days() as i32;
-                ColumnarValue::Scalar(ScalarValue::Date32(Some(days)))
-            }
-            ScalarValue::Date64(Some(_)) => {
-                let ms = posterior.sub(epoch).num_milliseconds();
-                ColumnarValue::Scalar(ScalarValue::Date64(Some(ms)))
-            }
-            _ => Err(DataFusionError::Execution(format!(
-                "Invalid lhs type for DateIntervalExpr: {}",
-                scalar
-            )))?,
-        };
-        Ok(res)
+        match dates {
+            ColumnarValue::Scalar(operand) => evaluate_scalar(operand, sign, 
intervals),
+            ColumnarValue::Array(array) => evaluate_array(array, sign, 
intervals),
+        }
     }
 }
 
+pub fn evaluate_array(
+    array: ArrayRef,
+    sign: i32,
+    scalar: &ScalarValue,
+) -> Result<ColumnarValue> {
+    let ret = match array.data_type() {
+        DataType::Date32 => {
+            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
+            Arc::new(unary::<Date32Type, _, Date32Type>(array, |days| {
+                date32_add(days, scalar, sign).unwrap()
+            })) as ArrayRef
+        }
+        DataType::Date64 => {
+            let array = array.as_any().downcast_ref::<Date64Array>().unwrap();
+            Arc::new(unary::<Date64Type, _, Date64Type>(array, |ms| {
+                date64_add(ms, scalar, sign).unwrap()
+            })) as ArrayRef
+        }
+        DataType::Timestamp(TimeUnit::Second, _) => {
+            let array = array
+                .as_any()
+                .downcast_ref::<TimestampSecondArray>()
+                .unwrap();
+            Arc::new(unary::<TimestampSecondType, _, TimestampSecondType>(
+                array,
+                |ts_s| seconds_add(ts_s, scalar, sign).unwrap(),
+            )) as ArrayRef
+        }
+        DataType::Timestamp(TimeUnit::Millisecond, _) => {
+            let array = array
+                .as_any()
+                .downcast_ref::<TimestampMillisecondArray>()
+                .unwrap();
+            Arc::new(
+                unary::<TimestampMillisecondType, _, TimestampMillisecondType>(
+                    array,
+                    |ts_ms| milliseconds_add(ts_ms, scalar, sign).unwrap(),
+                ),
+            ) as ArrayRef
+        }
+        DataType::Timestamp(TimeUnit::Microsecond, _) => {
+            let array = array
+                .as_any()
+                .downcast_ref::<TimestampMicrosecondArray>()
+                .unwrap();
+            Arc::new(
+                unary::<TimestampMicrosecondType, _, TimestampMicrosecondType>(
+                    array,
+                    |ts_us| microseconds_add(ts_us, scalar, sign).unwrap(),
+                ),
+            ) as ArrayRef
+        }
+        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+            let array = array
+                .as_any()
+                .downcast_ref::<TimestampNanosecondArray>()
+                .unwrap();
+            Arc::new(
+                unary::<TimestampNanosecondType, _, TimestampNanosecondType>(
+                    array,
+                    |ts_ns| nanoseconds_add(ts_ns, scalar, sign).unwrap(),
+                ),
+            ) as ArrayRef
+        }
+        _ => Err(DataFusionError::Execution(format!(
+            "Invalid lhs type for DateIntervalExpr: {}",
+            array.data_type()
+        )))?,
+    };
+    Ok(ColumnarValue::Array(ret))
+}
+
+fn evaluate_scalar(
+    operand: ScalarValue,
+    sign: i32,
+    scalar: &ScalarValue,
+) -> Result<ColumnarValue> {
+    let res = match operand {
+        ScalarValue::Date32(Some(days)) => {
+            let value = date32_add(days, scalar, sign)?;
+            ColumnarValue::Scalar(ScalarValue::Date32(Some(value)))
+        }
+        ScalarValue::Date64(Some(ms)) => {
+            let value = date64_add(ms, scalar, sign)?;
+            ColumnarValue::Scalar(ScalarValue::Date64(Some(value)))
+        }
+        ScalarValue::TimestampSecond(Some(ts_s), zone) => {
+            let value = seconds_add(ts_s, scalar, sign)?;
+            ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(value), 
zone))
+        }
+        ScalarValue::TimestampMillisecond(Some(ts_ms), zone) => {
+            let value = milliseconds_add(ts_ms, scalar, sign)?;
+            
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(value), zone))
+        }
+        ScalarValue::TimestampMicrosecond(Some(ts_us), zone) => {
+            let value = microseconds_add(ts_us, scalar, sign)?;
+            
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(value), zone))
+        }
+        ScalarValue::TimestampNanosecond(Some(ts_ns), zone) => {
+            let value = nanoseconds_add(ts_ns, scalar, sign)?;
+            
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(value), zone))
+        }
+        _ => Err(DataFusionError::Execution(format!(
+            "Invalid lhs type {} for DateIntervalExpr",
+            operand.get_datatype()
+        )))?,
+    };
+    Ok(res)
+}
+
+#[inline]
+fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::days(days as i64));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_days() as i32)
+}
+
+#[inline]
+fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::milliseconds(ms));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_milliseconds())
+}
+
+#[inline]
+fn seconds_add(ts_s: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    Ok(do_data_time_math(ts_s, 0, scalar, sign)?.timestamp())
+}
+
+#[inline]
+fn milliseconds_add(ts_ms: i64, scalar: &ScalarValue, sign: i32) -> 
Result<i64> {
+    let secs = ts_ms / 1000;
+    let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
+    Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_millis())
+}
+
+#[inline]
+fn microseconds_add(ts_us: i64, scalar: &ScalarValue, sign: i32) -> 
Result<i64> {
+    let secs = ts_us / 1_000_000;
+    let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
+    Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos() / 1000)
+}
+
+#[inline]
+fn nanoseconds_add(ts_ns: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> 
{
+    let secs = ts_ns / 1_000_000_000;
+    let nsecs = (ts_ns % 1_000_000_000) as u32;
+    Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos())
+}
+
+#[inline]
+fn do_data_time_math(
+    secs: i64,
+    nsecs: u32,
+    scalar: &ScalarValue,
+    sign: i32,
+) -> Result<NaiveDateTime> {
+    let prior = NaiveDateTime::from_timestamp(secs, nsecs);
+    do_date_math(prior, scalar, sign)
+}
+
+fn do_date_math<D>(prior: D, scalar: &ScalarValue, sign: i32) -> Result<D>
+where
+    D: Datelike + Add<Duration, Output = D>,
+{
+    Ok(match scalar {
+        ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
+        ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * 
sign),
+        ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, 
sign),
+        other => Err(DataFusionError::Execution(format!(
+            "DateIntervalExpr does not support non-interval type {:?}",
+            other
+        )))?,
+    })
+}
+
 // Can remove once https://github.com/apache/arrow-rs/pull/2031 is released

Review Comment:
   I believe this has been released -- maybe @avantgardnerio  can figure out 
how much of this can now be removed as it exists in arrow (or maybe how much we 
should push back upstream) 🤔 



##########
datafusion/physical-expr/src/expressions/datetime.rs:
##########
@@ -112,71 +120,166 @@ impl PhysicalExpr for DateIntervalExpr {
             }
         };
 
-        // Unwrap days since epoch
-        let operand = match dates {
-            ColumnarValue::Scalar(scalar) => scalar,
-            _ => Err(DataFusionError::Execution(
-                "Columnar execution is not yet supported for DateIntervalExpr"
-                    .to_string(),
-            ))?,
-        };
-
-        let res = match operand {
-            ScalarValue::Date32(Some(d)) => {
-                let epoch = NaiveDate::from_ymd(1970, 1, 1);
-                let prior = epoch.add(Duration::days(d as i64));
-                let posterior = do_date_math(prior, scalar, sign)?;
-                let days = posterior.sub(epoch).num_days() as i32;
-                ColumnarValue::Scalar(ScalarValue::Date32(Some(days)))
-            }
-            ScalarValue::Date64(Some(ms)) => {
-                let epoch = NaiveDate::from_ymd(1970, 1, 1);
-                let prior = epoch.add(Duration::milliseconds(ms));
-                let posterior = do_date_math(prior, scalar, sign)?;
-                let ms = posterior.sub(epoch).num_milliseconds();
-                ColumnarValue::Scalar(ScalarValue::Date64(Some(ms)))
-            }
-            ScalarValue::TimestampSecond(Some(ts_s), zone) => {
-                let value = do_data_time_math(ts_s, 0, scalar, 
sign)?.timestamp();
-                
ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(value), zone))
-            }
-            ScalarValue::TimestampMillisecond(Some(ts_ms), zone) => {
-                let secs = ts_ms / 1000;
-                let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
-                let value =
-                    do_data_time_math(secs, nsecs, scalar, 
sign)?.timestamp_millis();
-                ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
-                    Some(value),
-                    zone,
-                ))
-            }
-            ScalarValue::TimestampMicrosecond(Some(ts_us), zone) => {
-                let secs = ts_us / 1_000_000;
-                let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
-                let value = do_data_time_math(secs, nsecs, scalar, sign)?
-                    .timestamp_nanos()
-                    / 1000;
-                ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
-                    Some(value),
-                    zone,
-                ))
-            }
-            ScalarValue::TimestampNanosecond(Some(ts_ns), zone) => {
-                let secs = ts_ns / 1_000_000_000;
-                let nsecs = (ts_ns % 1_000_000_000) as u32;
-                let value =
-                    do_data_time_math(secs, nsecs, scalar, 
sign)?.timestamp_nanos();
-                
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(value), zone))
-            }
-            _ => Err(DataFusionError::Execution(format!(
-                "Invalid lhs type for DateIntervalExpr: {}",
-                scalar
-            )))?,
-        };
-        Ok(res)
+        match dates {
+            ColumnarValue::Scalar(operand) => evaluate_scalar(operand, sign, 
intervals),
+            ColumnarValue::Array(array) => evaluate_array(array, sign, 
intervals),
+        }
     }
 }
 
+pub fn evaluate_array(
+    array: ArrayRef,
+    sign: i32,
+    scalar: &ScalarValue,
+) -> Result<ColumnarValue> {
+    let ret = match array.data_type() {
+        DataType::Date32 => {
+            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();

Review Comment:
   But in other words, I think this PR is ok as written. 



##########
datafusion/core/tests/sql/timestamp.rs:
##########
@@ -1256,3 +1257,141 @@ async fn date_bin() {
         "Arrow error: External error: This feature is not implemented: 
DATE_BIN only supports literal values for the origin argument, not arrays"
     );
 }
+
+#[tokio::test]
+async fn timestamp_add_interval_second() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "SELECT NOW(), NOW() + INTERVAL '1' SECOND;";

Review Comment:
   In SQL, when `now()` is invoked more than once in the same statement it 
should return exactly the same answer (which is why it is marked `stable` and 
not volatile -- a somewhat special case)
   
   This is partially described in 
https://github.com/apache/arrow-datafusion/blob/a4fa44f79e58f241a3afbcfa71d2de8de4790c33/datafusion/physical-expr/src/datetime_expressions.rs#L169-L174
   
   So in other words, I think this test as written is 👍 and not subject to any 
race conditions



##########
datafusion/physical-expr/src/expressions/datetime.rs:
##########
@@ -112,71 +120,166 @@ impl PhysicalExpr for DateIntervalExpr {
             }
         };
 
-        // Unwrap days since epoch
-        let operand = match dates {
-            ColumnarValue::Scalar(scalar) => scalar,
-            _ => Err(DataFusionError::Execution(
-                "Columnar execution is not yet supported for DateIntervalExpr"
-                    .to_string(),
-            ))?,
-        };
-
-        let res = match operand {
-            ScalarValue::Date32(Some(d)) => {
-                let epoch = NaiveDate::from_ymd(1970, 1, 1);
-                let prior = epoch.add(Duration::days(d as i64));
-                let posterior = do_date_math(prior, scalar, sign)?;
-                let days = posterior.sub(epoch).num_days() as i32;
-                ColumnarValue::Scalar(ScalarValue::Date32(Some(days)))
-            }
-            ScalarValue::Date64(Some(ms)) => {
-                let epoch = NaiveDate::from_ymd(1970, 1, 1);
-                let prior = epoch.add(Duration::milliseconds(ms));
-                let posterior = do_date_math(prior, scalar, sign)?;
-                let ms = posterior.sub(epoch).num_milliseconds();
-                ColumnarValue::Scalar(ScalarValue::Date64(Some(ms)))
-            }
-            ScalarValue::TimestampSecond(Some(ts_s), zone) => {
-                let value = do_data_time_math(ts_s, 0, scalar, 
sign)?.timestamp();
-                
ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(value), zone))
-            }
-            ScalarValue::TimestampMillisecond(Some(ts_ms), zone) => {
-                let secs = ts_ms / 1000;
-                let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
-                let value =
-                    do_data_time_math(secs, nsecs, scalar, 
sign)?.timestamp_millis();
-                ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
-                    Some(value),
-                    zone,
-                ))
-            }
-            ScalarValue::TimestampMicrosecond(Some(ts_us), zone) => {
-                let secs = ts_us / 1_000_000;
-                let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
-                let value = do_data_time_math(secs, nsecs, scalar, sign)?
-                    .timestamp_nanos()
-                    / 1000;
-                ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
-                    Some(value),
-                    zone,
-                ))
-            }
-            ScalarValue::TimestampNanosecond(Some(ts_ns), zone) => {
-                let secs = ts_ns / 1_000_000_000;
-                let nsecs = (ts_ns % 1_000_000_000) as u32;
-                let value =
-                    do_data_time_math(secs, nsecs, scalar, 
sign)?.timestamp_nanos();
-                
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(value), zone))
-            }
-            _ => Err(DataFusionError::Execution(format!(
-                "Invalid lhs type for DateIntervalExpr: {}",
-                scalar
-            )))?,
-        };
-        Ok(res)
+        match dates {
+            ColumnarValue::Scalar(operand) => evaluate_scalar(operand, sign, 
intervals),
+            ColumnarValue::Array(array) => evaluate_array(array, sign, 
intervals),
+        }
     }
 }
 
+pub fn evaluate_array(
+    array: ArrayRef,
+    sign: i32,
+    scalar: &ScalarValue,
+) -> Result<ColumnarValue> {
+    let ret = match array.data_type() {
+        DataType::Date32 => {
+            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();

Review Comment:
   I think by this point in the plan it is ok to panic! as it indicates a bug 
in the code and I think having to always do the ceremony to check the type 
makes it less readable. Maybe we could make a function like `as_date32_array` 
that returned an error (similar to the arrow ones that panic?): 
https://docs.rs/arrow/20.0.0/arrow/array/index.html#functions ?
   
   Sadly it appears we are inconsistent in the codebase with this regards: 
https://github.com/apache/arrow-datafusion/search?l=Rust&q=downcast 😢 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to