Omega359 commented on code in PR #8886:
URL: https://github.com/apache/arrow-datafusion/pull/8886#discussion_r1457844560
##########
datafusion/physical-expr/src/datetime_expressions.rs:
##########
@@ -121,58 +291,220 @@ where
}
}
+// given an function that maps a `&str`, `&str` to an arrow native type,
+// returns a `ColumnarValue` where the function is applied to either a
`ArrayRef` or `ScalarValue`
+// depending on the `args`'s variant.
+fn handle_multiple<'a, O, F, S, M>(
+ args: &'a [ColumnarValue],
+ op: F,
+ op2: M,
+ name: &str,
+) -> Result<ColumnarValue>
+where
+ O: ArrowPrimitiveType,
+ S: ScalarType<O::Native>,
+ F: Fn(&'a str, &'a str) -> Result<O::Native>,
+ M: Fn(O::Native) -> O::Native,
+{
+ match &args[0] {
+ ColumnarValue::Array(a) => match a.data_type() {
+ DataType::Utf8 | DataType::LargeUtf8 => {
+ // validate the column types
+ for (pos, arg) in args.iter().enumerate() {
+ match arg {
+ ColumnarValue::Array(arg) => match arg.data_type() {
+ DataType::Utf8 | DataType::LargeUtf8 => {
+ // all good
+ },
+ other => return internal_err!("Unsupported
data type {other:?} for function {name}, arg # {pos}"),
+ },
+ ColumnarValue::Scalar(arg) => { match arg.data_type() {
+ DataType::Utf8 | DataType::LargeUtf8 => {
+ // all good
+ },
+ other => return internal_err!("Unsupported data
type {other:?} for function {name}, arg # {pos}"),
+ }}
+ }
+ }
+
+ Ok(ColumnarValue::Array(Arc::new(
+ strings_to_primitive_function::<i32, O, _, _>(args, op,
op2, name)?,
+ )))
+ }
+ other => {
+ internal_err!("Unsupported data type {other:?} for function
{name}")
+ }
+ },
+ // if the first argument is a scalar utf8 all arguments are expected
to be scalar utf8
+ ColumnarValue::Scalar(scalar) => match scalar {
+ ScalarValue::Utf8(a) | ScalarValue::LargeUtf8(a) => {
+ let mut val: Option<Result<ColumnarValue>> = None;
+ let mut err: Option<DataFusionError> = None;
+
+ match a {
+ Some(a) => {
+ // enumerate all the values finding the first one that
returns an Ok result
+ for (pos, v) in args.iter().enumerate().skip(1) {
+ if let ColumnarValue::Scalar(s) = v {
+ if let ScalarValue::Utf8(x) |
ScalarValue::LargeUtf8(x) =
+ s
+ {
+ if let Some(s) = x {
+ match op(a.as_str(), s.as_str()) {
+ Ok(r) => {
+ val =
Some(Ok(ColumnarValue::Scalar(
+ S::scalar(Some(op2(r))),
+ )));
+ break;
+ }
+ Err(e) => {
+ err = Some(e);
+ }
+ }
+ }
+ } else {
+ return internal_err!("Unsupported data
type {s:?} for function {name}, arg # {pos}");
+ }
+ } else {
+ return internal_err!("Unsupported data type
{v:?} for function {name}, arg # {pos}");
+ }
+ }
+ }
+ None => (),
+ }
+
+ if let Some(v) = val {
+ v
+ } else {
+ Err(err.unwrap())
+ }
+ }
+ other => {
+ internal_err!("Unsupported data type {other:?} for function
{name}")
+ }
+ },
+ }
+}
+
/// Calls string_to_timestamp_nanos and converts the error type
fn string_to_timestamp_nanos_shim(s: &str) -> Result<i64> {
string_to_timestamp_nanos(s).map_err(|e| e.into())
}
+/// Calls string_to_timestamp_nanos_formatted and converts the error type
+fn string_to_timestamp_nanos_with_format_shim(s: &str, f: &str) -> Result<i64>
{
+ string_to_timestamp_nanos_formatted(s, f)
+}
+
/// to_timestamp SQL function
///
-/// Note: `to_timestamp` returns `Timestamp(Nanosecond)` though its arguments
are interpreted as **seconds**. The supported range for integer input is
between `-9223372037` and `9223372036`.
+/// Note: `to_timestamp` returns `Timestamp(Nanosecond)` though its arguments
are interpreted as **seconds**.
+/// The supported range for integer input is between `-9223372037` and
`9223372036`.
/// Supported range for string input is between `1677-09-21T00:12:44.0` and
`2262-04-11T23:47:16.0`.
/// Please use `to_timestamp_seconds` for the input outside of supported
bounds.
pub fn to_timestamp(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- handle::<TimestampNanosecondType, _, TimestampNanosecondType>(
- args,
- string_to_timestamp_nanos_shim,
- "to_timestamp",
- )
+ match args.len() {
+ 1 => handle::<TimestampNanosecondType, _, TimestampNanosecondType>(
+ args,
+ string_to_timestamp_nanos_shim,
+ "to_timestamp",
+ ),
+ n if n >= 2 => {
+ handle_multiple::<TimestampNanosecondType, _,
TimestampNanosecondType, _>(
+ args,
+ string_to_timestamp_nanos_with_format_shim,
+ |n| n,
+ "to_timestamp",
+ )
+ }
+ _ => internal_err!("Unsupported 0 argument count for function
to_timestamp"),
+ }
}
/// to_timestamp_millis SQL function
pub fn to_timestamp_millis(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- handle::<TimestampMillisecondType, _, TimestampMillisecondType>(
- args,
- |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000_000),
- "to_timestamp_millis",
- )
+ match args.len() {
+ 1 => handle::<TimestampMillisecondType, _, TimestampMillisecondType>(
+ args,
+ |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000_000),
+ "to_timestamp_millis",
+ ),
+ n if n >= 2 => {
+ handle_multiple::<TimestampMillisecondType, _,
TimestampMillisecondType, _>(
+ args,
+ string_to_timestamp_nanos_with_format_shim,
+ |n| n / 1_000_000,
+ "to_timestamp_millis",
+ )
+ }
+ _ => {
+ internal_err!("Unsupported 0 argument count for function
to_timestamp_millis")
+ }
+ }
}
/// to_timestamp_micros SQL function
pub fn to_timestamp_micros(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- handle::<TimestampMicrosecondType, _, TimestampMicrosecondType>(
- args,
- |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000),
- "to_timestamp_micros",
- )
+ match args.len() {
+ 1 => handle::<TimestampMicrosecondType, _, TimestampMicrosecondType>(
+ args,
+ |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000),
+ "to_timestamp_micros",
+ ),
+ n if n >= 2 => {
+ handle_multiple::<TimestampMicrosecondType, _,
TimestampMicrosecondType, _>(
+ args,
+ string_to_timestamp_nanos_with_format_shim,
+ |n| n / 1_000,
+ "to_timestamp_micros",
+ )
+ }
+ _ => {
+ internal_err!("Unsupported 0 argument count for function
to_timestamp_micros")
+ }
+ }
}
/// to_timestamp_nanos SQL function
pub fn to_timestamp_nanos(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- handle::<TimestampNanosecondType, _, TimestampNanosecondType>(
- args,
- string_to_timestamp_nanos_shim,
- "to_timestamp_nanos",
- )
+ match args.len() {
+ 1 => handle::<TimestampNanosecondType, _, TimestampNanosecondType>(
+ args,
+ string_to_timestamp_nanos_shim,
+ "to_timestamp_nanos",
+ ),
+ n if n >= 2 => {
+ handle_multiple::<TimestampNanosecondType, _,
TimestampNanosecondType, _>(
+ args,
+ string_to_timestamp_nanos_with_format_shim,
+ |n| n,
+ "to_timestamp_nanos",
+ )
+ }
+ _ => {
+ internal_err!("Unsupported 0 argument count for function
to_timestamp_nanos")
+ }
+ }
}
/// to_timestamp_seconds SQL function
pub fn to_timestamp_seconds(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- handle::<TimestampSecondType, _, TimestampSecondType>(
- args,
- |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000_000_000),
- "to_timestamp_seconds",
- )
+ match args.len() {
+ 1 => handle::<TimestampSecondType, _, TimestampSecondType>(
+ args,
+ |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000_000_000),
+ "to_timestamp_seconds",
+ ),
+ n if n >= 2 => handle_multiple::<TimestampSecondType, _,
TimestampSecondType, _>(
+ args,
+ string_to_timestamp_nanos_with_format_shim,
+ |n| n / 1_000_000_000,
+ "to_timestamp_seconds",
+ ),
+ _ => internal_err!(
+ "Unsupported 0 argument count for function to_timestamp_seconds"
+ ),
+ }
Review Comment:
That almost works - I needed to add + ScalarType<i64> to the signature to
make it compile. I'll update the pull request with this improvement.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]