This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ef89e1b625 minor: reduce replication in `date_bin` implementation
(#5673)
ef89e1b625 is described below
commit ef89e1b6251bc7a5061627e1f1865553c2731de4
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Mar 22 10:52:36 2023 +0100
minor: reduce replication in `date_bin` implementation (#5673)
* minor: reduce replication in date_bin
* clippy
---
.../physical-expr/src/datetime_expressions.rs | 95 +++-------------------
1 file changed, 12 insertions(+), 83 deletions(-)
diff --git a/datafusion/physical-expr/src/datetime_expressions.rs
b/datafusion/physical-expr/src/datetime_expressions.rs
index 70f053328d..2763ff981c 100644
--- a/datafusion/physical-expr/src/datetime_expressions.rs
+++ b/datafusion/physical-expr/src/datetime_expressions.rs
@@ -320,9 +320,14 @@ fn date_bin_single(stride: i64, source: i64, origin: i64)
-> i64 {
/// DATE_BIN sql function
pub fn date_bin(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() == 2 {
- date_bin_2args(args)
+ // Default to unix EPOCH
+ let origin = ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
+ Some(0),
+ Some("+00:00".to_owned()),
+ ));
+ date_bin_impl(&args[0], &args[1], &origin)
} else if args.len() == 3 {
- date_bin_3args(args)
+ date_bin_impl(&args[0], &args[1], &args[2])
} else {
Err(DataFusionError::Execution(
"DATE_BIN expected two or three arguments".to_string(),
@@ -330,87 +335,11 @@ pub fn date_bin(args: &[ColumnarValue]) ->
Result<ColumnarValue> {
}
}
-fn date_bin_3args(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- let (stride, array, origin) = (&args[0], &args[1], &args[2]);
-
- let stride = match stride {
- ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
- let (days, ms) = IntervalDayTimeType::to_parts(*v);
- let nanos = (Duration::days(days as i64) +
Duration::milliseconds(ms as i64))
- .num_nanoseconds();
- match nanos {
- Some(v) => v,
- _ => {
- return Err(DataFusionError::Execution(
- "DATE_BIN stride argument is too large".to_string(),
- ))
- }
- }
- }
- ColumnarValue::Scalar(v) => {
- return Err(DataFusionError::Execution(format!(
- "DATE_BIN expects stride argument to be an INTERVAL but got
{}",
- v.get_datatype()
- )))
- }
- ColumnarValue::Array(_) => return Err(DataFusionError::NotImplemented(
- "DATE_BIN only supports literal values for the stride argument,
not arrays"
- .to_string(),
- )),
- };
-
- let origin = match origin {
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) =>
*v,
- ColumnarValue::Scalar(v) => {
- return Err(DataFusionError::Execution(format!(
- "DATE_BIN expects origin argument to be a TIMESTAMP but got
{}",
- v.get_datatype()
- )))
- }
- ColumnarValue::Array(_) => return Err(DataFusionError::NotImplemented(
- "DATE_BIN only supports literal values for the origin argument,
not arrays"
- .to_string(),
- )),
- };
-
- let f = |x: Option<i64>| x.map(|x| date_bin_single(stride, x, origin));
-
- Ok(match array {
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(f(*v),
tz_opt.clone()))
- }
- ColumnarValue::Array(array) => match array.data_type() {
- DataType::Timestamp(TimeUnit::Nanosecond, _) => {
- let array = as_timestamp_nanosecond_array(array)?
- .iter()
- .map(f)
- .collect::<TimestampNanosecondArray>();
-
- ColumnarValue::Array(Arc::new(array))
- }
- _ => {
- return Err(DataFusionError::Execution(format!(
- "DATE_BIN expects source argument to be a TIMESTAMP but
got {}",
- array.data_type()
- )))
- }
- },
- _ => {
- return Err(DataFusionError::Execution(
- "DATE_BIN expects source argument to be a TIMESTAMP scalar or
array"
- .to_string(),
- ));
- }
- })
-}
-
-fn date_bin_2args(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- let origin = &ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
- Some(0),
- Some("+00:00".to_owned()),
- ));
- let (stride, array) = (&args[0], &args[1]);
-
+fn date_bin_impl(
+ stride: &ColumnarValue,
+ array: &ColumnarValue,
+ origin: &ColumnarValue,
+) -> Result<ColumnarValue> {
let stride = match stride {
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
let (days, ms) = IntervalDayTimeType::to_parts(*v);