This is an automated email from the ASF dual-hosted git repository.
parthchandra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new b23b76033 feat: implement make_time and to_time (#4256)
b23b76033 is described below
commit b23b760332d096d0e171c0f4909c5e4adea17fde
Author: Parth Chandra <[email protected]>
AuthorDate: Wed May 20 10:58:56 2026 -0700
feat: implement make_time and to_time (#4256)
---
docs/source/user-guide/latest/expressions.md | 3 +
native/core/src/execution/columnar_to_row.rs | 39 +-
native/core/src/execution/planner.rs | 3 +
native/core/src/execution/serde.rs | 1 +
native/proto/src/proto/types.proto | 1 +
native/spark-expr/Cargo.toml | 4 +
native/spark-expr/benches/to_time.rs | 78 ++++
native/spark-expr/src/comet_scalar_funcs.rs | 13 +-
native/spark-expr/src/datetime_funcs/make_time.rs | 232 ++++++++++
native/spark-expr/src/datetime_funcs/mod.rs | 4 +
native/spark-expr/src/datetime_funcs/to_time.rs | 482 +++++++++++++++++++++
native/spark-expr/src/lib.rs | 6 +-
.../org/apache/comet/serde/QueryPlanSerde.scala | 3 +
.../main/scala/org/apache/comet/serde/hash.scala | 5 +-
.../scala/org/apache/comet/serde/literals.scala | 4 +-
.../org/apache/spark/sql/comet/util/Utils.scala | 10 +-
.../org/apache/comet/shims/CometTypeShim.scala | 3 +
.../org/apache/comet/shims/CometExprShim.scala | 35 +-
.../org/apache/comet/shims/CometExprShim.scala | 35 +-
.../org/apache/comet/shims/CometTypeShim.scala | 3 +
.../sql-tests/expressions/datetime/make_time.sql | 141 ++++++
.../sql-tests/expressions/datetime/to_time.sql | 260 +++++++++++
.../CometDatetimeExpressionBenchmark.scala | 37 ++
23 files changed, 1387 insertions(+), 15 deletions(-)
diff --git a/docs/source/user-guide/latest/expressions.md
b/docs/source/user-guide/latest/expressions.md
index ef74836d8..40d2154e8 100644
--- a/docs/source/user-guide/latest/expressions.md
+++ b/docs/source/user-guide/latest/expressions.md
@@ -116,6 +116,7 @@ of expressions that be disabled.
| LastDay | `last_day` |
| LocalTimestamp | `localtimestamp` |
| MakeDate | `make_date` |
+| MakeTime | `make_time` |
| Minute | `minute` |
| NextDay | `next_day` |
| Second | `second` |
@@ -132,6 +133,8 @@ of expressions that be disabled.
| DayOfYear | `dayofyear` |
| WeekOfYear | `weekofyear` |
| Quarter | `quarter` |
+| ToTime | `to_time` |
+| TryToTime | `try_to_time` |
## Math Expressions
diff --git a/native/core/src/execution/columnar_to_row.rs
b/native/core/src/execution/columnar_to_row.rs
index ec2b633cc..9a3616bef 100644
--- a/native/core/src/execution/columnar_to_row.rs
+++ b/native/core/src/execution/columnar_to_row.rs
@@ -161,6 +161,7 @@ enum TypedArray<'a> {
Float64(&'a Float64Array),
Date32(&'a Date32Array),
TimestampMicro(&'a TimestampMicrosecondArray),
+ Time64Nano(&'a Time64NanosecondArray),
Decimal128(&'a Decimal128Array, u8), // array + precision
String(&'a StringArray),
LargeString(&'a LargeStringArray),
@@ -200,6 +201,10 @@ impl<'a> TypedArray<'a> {
DataType::Timestamp(TimeUnit::Microsecond, _) =>
Ok(TypedArray::TimestampMicro(
downcast_array!(array, TimestampMicrosecondArray)?,
)),
+ DataType::Time64(TimeUnit::Nanosecond) =>
Ok(TypedArray::Time64Nano(downcast_array!(
+ array,
+ Time64NanosecondArray
+ )?)),
DataType::Decimal128(p, _) => Ok(TypedArray::Decimal128(
downcast_array!(array, Decimal128Array)?,
*p,
@@ -267,6 +272,7 @@ impl<'a> TypedArray<'a> {
Float64,
Date32,
TimestampMicro,
+ Time64Nano,
Decimal128,
String,
LargeString,
@@ -295,6 +301,7 @@ impl<'a> TypedArray<'a> {
TypedArray::Float64(arr) => arr.value(row_idx).to_bits() as i64,
TypedArray::Date32(arr) => arr.value(row_idx) as i64,
TypedArray::TimestampMicro(arr) => arr.value(row_idx),
+ TypedArray::Time64Nano(arr) => arr.value(row_idx),
TypedArray::Decimal128(arr, precision) if *precision <=
MAX_LONG_DIGITS => {
arr.value(row_idx) as i64
}
@@ -317,7 +324,8 @@ impl<'a> TypedArray<'a> {
| TypedArray::Float32(_)
| TypedArray::Float64(_)
| TypedArray::Date32(_)
- | TypedArray::TimestampMicro(_) => false,
+ | TypedArray::TimestampMicro(_)
+ | TypedArray::Time64Nano(_) => false,
TypedArray::Decimal128(_, precision) => *precision >
MAX_LONG_DIGITS,
_ => true,
}
@@ -380,6 +388,7 @@ enum TypedElements<'a> {
Float64(&'a Float64Array),
Date32(&'a Date32Array),
TimestampMicro(&'a TimestampMicrosecondArray),
+ Time64Nano(&'a Time64NanosecondArray),
Decimal128(&'a Decimal128Array, u8),
String(&'a StringArray),
LargeString(&'a LargeStringArray),
@@ -418,6 +427,11 @@ impl<'a> TypedElements<'a> {
return TypedElements::TimestampMicro(arr);
}
}
+ DataType::Time64(TimeUnit::Nanosecond) => {
+ if let Some(arr) =
array.as_any().downcast_ref::<Time64NanosecondArray>() {
+ return TypedElements::Time64Nano(arr);
+ }
+ }
DataType::Decimal128(p, _) => {
if let Some(arr) =
array.as_any().downcast_ref::<Decimal128Array>() {
return TypedElements::Decimal128(arr, *p);
@@ -442,6 +456,7 @@ impl<'a> TypedElements<'a> {
TypedElements::Int32(_) | TypedElements::Date32(_) |
TypedElements::Float32(_) => 4,
TypedElements::Int64(_)
| TypedElements::TimestampMicro(_)
+ | TypedElements::Time64Nano(_)
| TypedElements::Float64(_) => 8,
TypedElements::Decimal128(_, p) if *p <= MAX_LONG_DIGITS => 8,
_ => 8, // Variable-length uses 8 bytes for offset+length
@@ -460,6 +475,7 @@ impl<'a> TypedElements<'a> {
| TypedElements::Float64(_)
| TypedElements::Date32(_)
| TypedElements::TimestampMicro(_)
+ | TypedElements::Time64Nano(_)
)
}
@@ -479,6 +495,7 @@ impl<'a> TypedElements<'a> {
Float64,
Date32,
TimestampMicro,
+ Time64Nano,
Decimal128,
String,
LargeString,
@@ -502,7 +519,8 @@ impl<'a> TypedElements<'a> {
| TypedElements::Float32(_)
| TypedElements::Float64(_)
| TypedElements::Date32(_)
- | TypedElements::TimestampMicro(_) => true,
+ | TypedElements::TimestampMicro(_)
+ | TypedElements::Time64Nano(_) => true,
TypedElements::Decimal128(_, p) => *p <= MAX_LONG_DIGITS,
_ => false,
}
@@ -521,6 +539,7 @@ impl<'a> TypedElements<'a> {
TypedElements::Float64(arr) => arr.value(idx).to_bits() as i64,
TypedElements::Date32(arr) => arr.value(idx) as i64,
TypedElements::TimestampMicro(arr) => arr.value(idx),
+ TypedElements::Time64Nano(arr) => arr.value(idx),
TypedElements::Decimal128(arr, _) => arr.value(idx) as i64,
_ => 0, // Should not be called for variable-length types
}
@@ -655,6 +674,7 @@ impl<'a> TypedElements<'a> {
TypedElements::Float64(arr) => bulk_copy_range!(arr, 8),
TypedElements::Date32(arr) => bulk_copy_range!(arr, 4),
TypedElements::TimestampMicro(arr) => bulk_copy_range!(arr, 8),
+ TypedElements::Time64Nano(arr) => bulk_copy_range!(arr, 8),
_ => {} // Should not reach here due to supports_bulk_copy check
}
}
@@ -827,7 +847,8 @@ fn is_fixed_width(data_type: &DataType) -> bool {
| DataType::Float32
| DataType::Float64
| DataType::Date32
- | DataType::Timestamp(TimeUnit::Microsecond, _) => true,
+ | DataType::Timestamp(TimeUnit::Microsecond, _)
+ | DataType::Time64(TimeUnit::Nanosecond) => true,
DataType::Decimal128(p, _) => *p <= MAX_LONG_DIGITS,
_ => false,
}
@@ -1235,6 +1256,15 @@ impl ColumnarToRowContext {
TimestampMicrosecondArray,
|v: i64| v
),
+ DataType::Time64(TimeUnit::Nanosecond) =>
write_fixed_column_primitive!(
+ self,
+ array,
+ row_size,
+ field_offset_in_row,
+ num_rows,
+ Time64NanosecondArray,
+ |v: i64| v
+ ),
DataType::Decimal128(precision, _) if *precision <=
MAX_LONG_DIGITS => {
write_fixed_column_primitive!(
self,
@@ -1360,6 +1390,9 @@ fn get_field_value(data_type: &DataType, array:
&ArrayRef, row_idx: usize) -> Co
DataType::Timestamp(TimeUnit::Microsecond, _) => {
get_field_value_primitive!(array, row_idx,
TimestampMicrosecondArray, |v: i64| v)
}
+ DataType::Time64(TimeUnit::Nanosecond) => {
+ get_field_value_primitive!(array, row_idx, Time64NanosecondArray,
|v: i64| v)
+ }
DataType::Decimal128(precision, _) if *precision <= MAX_LONG_DIGITS =>
{
get_field_value_primitive!(array, row_idx, Decimal128Array, |v:
i128| v as i64)
}
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index 75fdb03d0..41f0d3719 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -356,6 +356,9 @@ impl PhysicalPlanner {
DataType::Map(f, s) => DataType::Map(f, s).try_into()?,
DataType::List(f) => DataType::List(f).try_into()?,
DataType::Null => ScalarValue::Null,
+ DataType::Time64(TimeUnit::Nanosecond) => {
+ ScalarValue::Time64Nanosecond(None)
+ }
dt => {
return Err(GeneralError(format!("{dt:?} is not
supported in Comet")))
}
diff --git a/native/core/src/execution/serde.rs
b/native/core/src/execution/serde.rs
index 5d60288f6..d6ec6be13 100644
--- a/native/core/src/execution/serde.rs
+++ b/native/core/src/execution/serde.rs
@@ -96,6 +96,7 @@ pub fn to_arrow_datatype(dt_value: &DataType) ->
ArrowDataType {
}
DataTypeId::TimestampNtz =>
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
DataTypeId::Date => ArrowDataType::Date32,
+ DataTypeId::Time => ArrowDataType::Time64(TimeUnit::Nanosecond),
DataTypeId::Null => ArrowDataType::Null,
DataTypeId::List => match dt_value
.type_info
diff --git a/native/proto/src/proto/types.proto
b/native/proto/src/proto/types.proto
index fec972a8f..df0c0c555 100644
--- a/native/proto/src/proto/types.proto
+++ b/native/proto/src/proto/types.proto
@@ -59,6 +59,7 @@ message DataType {
LIST = 14;
MAP = 15;
STRUCT = 16;
+ TIME = 17;
}
DataTypeId type_id = 1;
diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml
index 33ffc1c88..1b0359059 100644
--- a/native/spark-expr/Cargo.toml
+++ b/native/spark-expr/Cargo.toml
@@ -118,3 +118,7 @@ harness = false
[[bench]]
name = "map_sort"
harness = false
+
+[[bench]]
+name = "to_time"
+harness = false
diff --git a/native/spark-expr/benches/to_time.rs
b/native/spark-expr/benches/to_time.rs
new file mode 100644
index 000000000..81815fd98
--- /dev/null
+++ b/native/spark-expr/benches/to_time.rs
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::StringArray;
+use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion::physical_plan::ColumnarValue;
+use datafusion_comet_spark_expr::spark_to_time;
+use std::sync::Arc;
+
+fn criterion_benchmark(c: &mut Criterion) {
+ let mut group = c.benchmark_group("to_time");
+
+ let hh_mm = create_string_array(10000, |i| format!("{}:{:02}", i % 24, i %
60));
+ group.bench_function("hh_mm", |b| {
+ b.iter(|| spark_to_time(std::slice::from_ref(&hh_mm), true).unwrap());
+ });
+
+ let hh_mm_ss =
+ create_string_array(10000, |i| format!("{}:{:02}:{:02}", i % 24, i %
60, i % 60));
+ group.bench_function("hh_mm_ss", |b| {
+ b.iter(|| spark_to_time(std::slice::from_ref(&hh_mm_ss),
true).unwrap());
+ });
+
+ let fractional = create_string_array(10000, |i| {
+ format!(
+ "{}:{:02}:{:02}.{:06}",
+ i % 24,
+ i % 60,
+ i % 60,
+ i * 7 % 1000000
+ )
+ });
+ group.bench_function("fractional", |b| {
+ b.iter(|| spark_to_time(std::slice::from_ref(&fractional),
true).unwrap());
+ });
+
+ let am_pm = create_string_array(10000, |i| {
+ let hour = (i % 12) + 1;
+ let suffix = if i % 2 == 0 { "AM" } else { "PM" };
+ format!("{}:{:02}:{:02} {}", hour, i % 60, i % 60, suffix)
+ });
+ group.bench_function("am_pm", |b| {
+ b.iter(|| spark_to_time(std::slice::from_ref(&am_pm), true).unwrap());
+ });
+
+ group.finish();
+}
+
+fn create_string_array(size: usize, f: impl Fn(usize) -> String) ->
ColumnarValue {
+ let values: Vec<String> = (0..size).map(&f).collect();
+ let array = StringArray::from(values);
+ ColumnarValue::Array(Arc::new(array))
+}
+
+fn config() -> Criterion {
+ Criterion::default()
+}
+
+criterion_group! {
+ name = benches;
+ config = config();
+ targets = criterion_benchmark
+}
+criterion_main!(benches);
diff --git a/native/spark-expr/src/comet_scalar_funcs.rs
b/native/spark-expr/src/comet_scalar_funcs.rs
index 9ecb11dc5..7108105dc 100644
--- a/native/spark-expr/src/comet_scalar_funcs.rs
+++ b/native/spark-expr/src/comet_scalar_funcs.rs
@@ -23,10 +23,11 @@ use crate::math_funcs::log::spark_log;
use crate::math_funcs::modulo_expr::spark_modulo;
use crate::{
spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor,
spark_isnan,
- spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round,
spark_rpad, spark_unhex,
- spark_unscaled_value, EvalMode, SparkArrayCompact, SparkArrayPositionFunc,
SparkArraysOverlap,
- SparkContains, SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc,
SparkMakeDate,
- SparkSecondsToTimestamp, SparkSizeFunc,
+ spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round,
spark_rpad,
+ spark_to_time, spark_unhex, spark_unscaled_value, EvalMode,
SparkArrayCompact,
+ SparkArrayPositionFunc, SparkArraysOverlap, SparkContains, SparkDateDiff,
+ SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, SparkMakeTime,
SparkSecondsToTimestamp,
+ SparkSizeFunc,
};
use arrow::datatypes::DataType;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
@@ -196,6 +197,9 @@ pub fn create_comet_physical_fun_with_eval_mode(
let func = Arc::new(spark_map_sort);
make_comet_scalar_udf!("spark_map_sort", func, without data_type)
}
+ "to_time" => {
+ make_comet_scalar_udf!("to_time", spark_to_time, without
data_type, fail_on_error)
+ }
_ => registry.udf(fun_name).map_err(|e| {
DataFusionError::Execution(format!(
"Function {fun_name} not found in the registry: {e}",
@@ -214,6 +218,7 @@ fn all_scalar_functions() -> Vec<Arc<ScalarUDF>> {
Arc::new(ScalarUDF::new_from_impl(SparkDateFromUnixDate::default())),
Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())),
Arc::new(ScalarUDF::new_from_impl(SparkMakeDate::default())),
+ Arc::new(ScalarUDF::new_from_impl(SparkMakeTime::default())),
Arc::new(ScalarUDF::new_from_impl(SparkSecondsToTimestamp::default())),
Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())),
]
diff --git a/native/spark-expr/src/datetime_funcs/make_time.rs
b/native/spark-expr/src/datetime_funcs/make_time.rs
new file mode 100644
index 000000000..154ef3bf4
--- /dev/null
+++ b/native/spark-expr/src/datetime_funcs/make_time.rs
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Array, Decimal128Array, Int32Array, Time64NanosecondArray};
+use arrow::compute::cast;
+use arrow::datatypes::{DataType, TimeUnit};
+use datafusion::common::{utils::take_function_args, DataFusionError, Result};
+use datafusion::logical_expr::{
+ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
+};
+use std::any::Any;
+use std::sync::Arc;
+
+const MICROS_PER_SECOND: i128 = 1_000_000;
+const NANOS_PER_MICRO: i64 = 1_000;
+const NANOS_PER_SECOND: i64 = 1_000_000_000;
+
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct SparkMakeTime {
+ signature: Signature,
+}
+
+impl SparkMakeTime {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::any(3, Volatility::Immutable),
+ }
+ }
+}
+
+impl Default for SparkMakeTime {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Converts hours, minutes, and fractional seconds (Decimal(16,6)) to
nanoseconds from midnight.
+/// Returns an error for invalid inputs (matching Spark's always-throw
behavior).
+fn make_time(hours: i32, minutes: i32, secs_and_micros_unscaled: i128) ->
Result<i64> {
+ let full_secs = secs_and_micros_unscaled.div_euclid(MICROS_PER_SECOND);
+ let frac_micros = secs_and_micros_unscaled.rem_euclid(MICROS_PER_SECOND);
+
+ if full_secs > i32::MAX as i128 || full_secs < 0 {
+ return Err(DataFusionError::Execution(format!(
+ "Invalid value for SecondOfMinute (valid values 0 - 59): {}",
+ secs_and_micros_unscaled / MICROS_PER_SECOND
+ )));
+ }
+
+ let secs = full_secs as i32;
+ let nanos = (frac_micros as i64) * NANOS_PER_MICRO;
+
+ if !(0..=23).contains(&hours) {
+ return Err(DataFusionError::Execution(format!(
+ "Invalid value for HourOfDay (valid values 0 - 23): {hours}"
+ )));
+ }
+ if !(0..=59).contains(&minutes) {
+ return Err(DataFusionError::Execution(format!(
+ "Invalid value for MinuteOfHour (valid values 0 - 59): {minutes}"
+ )));
+ }
+ if !(0..=59).contains(&secs) {
+ return Err(DataFusionError::Execution(format!(
+ "Invalid value for SecondOfMinute (valid values 0 - 59): {secs}"
+ )));
+ }
+
+ let total_nanos = hours as i64 * 3_600 * NANOS_PER_SECOND
+ + minutes as i64 * 60 * NANOS_PER_SECOND
+ + secs as i64 * NANOS_PER_SECOND
+ + nanos;
+
+ Ok(total_nanos)
+}
+
+impl ScalarUDFImpl for SparkMakeTime {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "make_time"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _: &[DataType]) -> Result<DataType> {
+ Ok(DataType::Time64(TimeUnit::Nanosecond))
+ }
+
+ fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
+ let [hours, minutes, secs_and_micros] =
take_function_args(self.name(), args.args)?;
+
+ let num_rows = [&hours, &minutes, &secs_and_micros]
+ .iter()
+ .find_map(|arg| match arg {
+ ColumnarValue::Array(array) => Some(array.len()),
+ ColumnarValue::Scalar(_) => None,
+ })
+ .unwrap_or(1);
+
+ let hours_arr = hours.into_array(num_rows)?;
+ let minutes_arr = minutes.into_array(num_rows)?;
+ let secs_arr = secs_and_micros.into_array(num_rows)?;
+
+ let hours_arr = cast_to_int32(&hours_arr)?;
+ let minutes_arr = cast_to_int32(&minutes_arr)?;
+
+ let hours_array = hours_arr
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .ok_or_else(|| {
+ DataFusionError::Execution("make_time: failed to cast hours to
Int32".to_string())
+ })?;
+
+ let minutes_array = minutes_arr
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .ok_or_else(|| {
+ DataFusionError::Execution("make_time: failed to cast minutes
to Int32".to_string())
+ })?;
+
+ let secs_array = secs_arr
+ .as_any()
+ .downcast_ref::<Decimal128Array>()
+ .ok_or_else(|| {
+ DataFusionError::Execution(
+ "make_time: expected Decimal128 for seconds
argument".to_string(),
+ )
+ })?;
+
+ let len = hours_array.len();
+ let mut builder = Time64NanosecondArray::builder(len);
+
+ for i in 0..len {
+ if hours_array.is_null(i) || minutes_array.is_null(i) ||
secs_array.is_null(i) {
+ builder.append_null();
+ } else {
+ let h = hours_array.value(i);
+ let m = minutes_array.value(i);
+ let s = secs_array.value(i);
+
+ let nanos = make_time(h, m, s)?;
+ builder.append_value(nanos);
+ }
+ }
+
+ Ok(ColumnarValue::Array(Arc::new(builder.finish())))
+ }
+}
+
+fn cast_to_int32(arr: &Arc<dyn Array>) -> Result<Arc<dyn Array>> {
+ if arr.data_type() == &DataType::Int32 {
+ Ok(Arc::clone(arr))
+ } else {
+ cast(arr.as_ref(), &DataType::Int32)
+ .map_err(|e| DataFusionError::Execution(format!("Failed to cast to
Int32: {e}")))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_make_time_valid() {
+ // Midnight
+ assert_eq!(make_time(0, 0, 0).unwrap(), 0);
+ // 1 hour
+ assert_eq!(make_time(1, 0, 0).unwrap(), 3_600_000_000_000);
+ // 1 minute
+ assert_eq!(make_time(0, 1, 0).unwrap(), 60_000_000_000);
+ // 1 second (unscaled: 1_000_000)
+ assert_eq!(make_time(0, 0, 1_000_000).unwrap(), 1_000_000_000);
+ // 1.5 seconds (unscaled: 1_500_000)
+ assert_eq!(make_time(0, 0, 1_500_000).unwrap(), 1_500_000_000);
+ // 23:59:59.999999 (unscaled: 59_999_999)
+ assert_eq!(make_time(23, 59, 59_999_999).unwrap(), 86_399_999_999_000);
+ // 12:30:45.123456 (unscaled: 45_123_456)
+ assert_eq!(
+ make_time(12, 30, 45_123_456).unwrap(),
+ 12 * 3_600_000_000_000 + 30 * 60_000_000_000 + 45_123_456_000
+ );
+ }
+
+ #[test]
+ fn test_make_time_invalid_hours() {
+ assert!(make_time(24, 0, 0).is_err());
+ assert!(make_time(25, 0, 0).is_err());
+ assert!(make_time(-1, 0, 0).is_err());
+ }
+
+ #[test]
+ fn test_make_time_invalid_minutes() {
+ assert!(make_time(0, 60, 0).is_err());
+ assert!(make_time(0, -1, 0).is_err());
+ }
+
+ #[test]
+ fn test_make_time_invalid_seconds() {
+ // 60 seconds (unscaled: 60_000_000)
+ assert!(make_time(0, 0, 60_000_000).is_err());
+ // 100.5 seconds (unscaled: 100_500_000)
+ assert!(make_time(0, 0, 100_500_000).is_err());
+ // negative seconds (unscaled: -1_000_000)
+ assert!(make_time(0, 0, -1_000_000).is_err());
+ }
+
+ #[test]
+ fn test_make_time_overflow_seconds() {
+ // Very large value that overflows i32
+ let large = (i32::MAX as i128 + 1) * MICROS_PER_SECOND;
+ assert!(make_time(0, 0, large).is_err());
+ }
+}
diff --git a/native/spark-expr/src/datetime_funcs/mod.rs
b/native/spark-expr/src/datetime_funcs/mod.rs
index a94bf16ce..53a1f185f 100644
--- a/native/spark-expr/src/datetime_funcs/mod.rs
+++ b/native/spark-expr/src/datetime_funcs/mod.rs
@@ -21,8 +21,10 @@ mod date_trunc;
mod extract_date_part;
mod hours;
mod make_date;
+mod make_time;
mod seconds_to_timestamp;
mod timestamp_trunc;
+mod to_time;
mod unix_timestamp;
pub use date_diff::SparkDateDiff;
@@ -33,6 +35,8 @@ pub use extract_date_part::SparkMinute;
pub use extract_date_part::SparkSecond;
pub use hours::SparkHoursTransform;
pub use make_date::SparkMakeDate;
+pub use make_time::SparkMakeTime;
pub use seconds_to_timestamp::SparkSecondsToTimestamp;
pub use timestamp_trunc::TimestampTruncExpr;
+pub use to_time::spark_to_time;
pub use unix_timestamp::SparkUnixTimestamp;
diff --git a/native/spark-expr/src/datetime_funcs/to_time.rs
b/native/spark-expr/src/datetime_funcs/to_time.rs
new file mode 100644
index 000000000..727998fdf
--- /dev/null
+++ b/native/spark-expr/src/datetime_funcs/to_time.rs
@@ -0,0 +1,482 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Array, StringArray, Time64NanosecondArray};
+use datafusion::common::{DataFusionError, Result};
+use datafusion::physical_plan::ColumnarValue;
+use std::sync::Arc;
+
+const NANOS_PER_MICRO: i64 = 1_000;
+const NANOS_PER_SECOND: i64 = 1_000_000_000;
+const NANOS_PER_MINUTE: i64 = 60 * NANOS_PER_SECOND;
+const NANOS_PER_HOUR: i64 = 60 * NANOS_PER_MINUTE;
+
+/// Spark-compatible to_time: parse a string to time (nanoseconds from
midnight).
+/// When fail_on_error is true (to_time), returns an error for unparseable
input.
+/// When fail_on_error is false (try_to_time), returns null for unparseable
input.
+pub fn spark_to_time(args: &[ColumnarValue], fail_on_error: bool) ->
Result<ColumnarValue> {
+ if args.is_empty() {
+ return Err(DataFusionError::Execution(
+ "to_time requires at least 1 argument".to_string(),
+ ));
+ }
+
+ let num_rows = args
+ .iter()
+ .find_map(|arg| match arg {
+ ColumnarValue::Array(array) => Some(array.len()),
+ ColumnarValue::Scalar(_) => None,
+ })
+ .unwrap_or(1);
+
+ let str_arr = args[0].clone().into_array(num_rows)?;
+ let str_array = str_arr
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .ok_or_else(|| {
+ DataFusionError::Execution("to_time: expected String
argument".to_string())
+ })?;
+
+ let len = str_array.len();
+ let mut builder = Time64NanosecondArray::builder(len);
+
+ for i in 0..len {
+ if str_array.is_null(i) {
+ builder.append_null();
+ } else {
+ let s = str_array.value(i);
+ match string_to_time(s) {
+ Some(nanos) => builder.append_value(nanos),
+ None => {
+ if fail_on_error {
+ return Err(DataFusionError::Execution(format!(
+ "The input string '{}' cannot be parsed to a TIME
value",
+ s
+ )));
+ }
+ builder.append_null();
+ }
+ }
+ }
+ }
+
+ Ok(ColumnarValue::Array(Arc::new(builder.finish())))
+}
+
+/// Parse a time string to nanoseconds from midnight, matching Spark's
stringToTime behavior.
+/// Returns None for invalid input.
+fn string_to_time(s: &str) -> Option<i64> {
+ let trimmed = s.trim();
+ if trimmed.is_empty() {
+ return None;
+ }
+
+ // Spark's parseTimestampString gates the T-prefix branch on j == 0 (start
of
+ // the trimmed string), so " T12:30" is rejected even though leading
whitespace
+ // is trimmed: the original segment start differs from the trimmed
position.
+ if trimmed.as_bytes()[0] == b'T' && s.as_bytes()[0].is_ascii_whitespace() {
+ return None;
+ }
+
+ let bytes = trimmed.as_bytes();
+ let num_chars = bytes.len();
+
+ // Detect AM/PM suffix
+ let (is_am, is_pm, has_suffix) = if num_chars > 2 {
+ let last = bytes[num_chars - 1];
+ if last == b'M' || last == b'm' {
+ let second_last = bytes[num_chars - 2];
+ let am = second_last == b'A' || second_last == b'a';
+ let pm = second_last == b'P' || second_last == b'p';
+ (am, pm, am || pm)
+ } else {
+ (false, false, false)
+ }
+ } else {
+ (false, false, false)
+ };
+
+ // Strip AM/PM suffix (and optional space before it)
+ let time_str = if has_suffix {
+ let end = num_chars - 2;
+ let s = &trimmed[..end];
+ s.trim_end()
+ } else {
+ trimmed
+ };
+
+ // Parse the time components
+ let (hour, minute, second, micros) = parse_time_components(time_str)?;
+
+ // Validate and convert hours
+ let hr = if !has_suffix {
+ if hour > 23 {
+ return None;
+ }
+ hour
+ } else {
+ if !(1..=12).contains(&hour) {
+ return None;
+ }
+ if is_am {
+ if hour == 12 {
+ 0
+ } else {
+ hour
+ }
+ } else if is_pm {
+ if hour == 12 {
+ 12
+ } else {
+ hour + 12
+ }
+ } else {
+ return None;
+ }
+ };
+
+ // Validate minutes and seconds
+ if minute > 59 || second > 59 {
+ return None;
+ }
+
+ let nanos = hr as i64 * NANOS_PER_HOUR
+ + minute as i64 * NANOS_PER_MINUTE
+ + second as i64 * NANOS_PER_SECOND
+ + micros as i64 * NANOS_PER_MICRO;
+
+ Some(nanos)
+}
+
+/// Parse time components from a string like "HH:mm:ss.ffffff" or "T HH:mm:ss".
+/// Returns (hour, minute, second, microseconds) or None if invalid.
+fn parse_time_components(s: &str) -> Option<(i32, i32, i32, i32)> {
+ let bytes = s.as_bytes();
+ if bytes.is_empty() {
+ return None;
+ }
+
+ let mut pos = 0;
+
+ // Skip optional 'T' prefix
+ if bytes[pos] == b'T' {
+ pos += 1;
+ }
+
+ // Parse hour
+ let (hour, new_pos) = parse_digits(bytes, pos)?;
+ pos = new_pos;
+ if hour < 0 {
+ return None;
+ }
+
+ // Expect ':'
+ if pos >= bytes.len() || bytes[pos] != b':' {
+ return None;
+ }
+ pos += 1;
+
+ // Parse minute
+ let (minute, new_pos) = parse_digits(bytes, pos)?;
+ pos = new_pos;
+
+ // Optional seconds
+ if pos >= bytes.len() {
+ return Some((hour, minute, 0, 0));
+ }
+
+ if bytes[pos] != b':' {
+ return None;
+ }
+ pos += 1;
+
+ // Parse seconds
+ let (second, new_pos) = parse_digits(bytes, pos)?;
+ pos = new_pos;
+
+ // Optional fractional seconds
+ if pos >= bytes.len() {
+ return Some((hour, minute, second, 0));
+ }
+
+ if bytes[pos] != b'.' {
+ // No more content allowed (timezone would invalidate)
+ return None;
+ }
+ pos += 1;
+
+ // Parse fractional seconds (up to 6 digits, pad with zeros)
+ let (micros, new_pos) = parse_fractional(bytes, pos)?;
+ pos = new_pos;
+
+ // Nothing should follow the fractional seconds (timezone not allowed for
time)
+ if pos < bytes.len() {
+ return None;
+ }
+
+ Some((hour, minute, second, micros))
+}
+
+/// Parse consecutive digits starting at pos. Returns (value, new_pos).
+/// At least 1 digit is required.
+fn parse_digits(bytes: &[u8], start: usize) -> Option<(i32, usize)> {
+ let mut pos = start;
+ let mut value: i32 = 0;
+ let mut count = 0;
+
+ while pos < bytes.len() {
+ let b = bytes[pos];
+ if b.is_ascii_digit() {
+ value = value * 10 + (b - b'0') as i32;
+ count += 1;
+ pos += 1;
+ } else {
+ break;
+ }
+ }
+
+ if count == 0 || count > 2 {
+ return None;
+ }
+
+ Some((value, pos))
+}
+
+/// Parse fractional seconds (microseconds). Up to 6 digits, padded with zeros.
+/// Returns (microseconds, new_pos).
+fn parse_fractional(bytes: &[u8], start: usize) -> Option<(i32, usize)> {
+ let mut pos = start;
+ let mut value: i32 = 0;
+ let mut count = 0;
+
+ while pos < bytes.len() && count < 6 {
+ let b = bytes[pos];
+ if b.is_ascii_digit() {
+ value = value * 10 + (b - b'0') as i32;
+ count += 1;
+ pos += 1;
+ } else {
+ break;
+ }
+ }
+
+ if count == 0 {
+ return None;
+ }
+
+ // Skip any remaining digits beyond 6 (truncation)
+ while pos < bytes.len() && bytes[pos].is_ascii_digit() {
+ pos += 1;
+ }
+
+ // Pad with zeros to 6 digits
+ while count < 6 {
+ value *= 10;
+ count += 1;
+ }
+
+ Some((value, pos))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_basic_time_parsing() {
+ // HH:mm
+ assert_eq!(string_to_time("00:00"), Some(0));
+ assert_eq!(
+ string_to_time("12:30"),
+ Some(12 * NANOS_PER_HOUR + 30 * NANOS_PER_MINUTE)
+ );
+ assert_eq!(
+ string_to_time("23:59"),
+ Some(23 * NANOS_PER_HOUR + 59 * NANOS_PER_MINUTE)
+ );
+
+ // HH:mm:ss
+ assert_eq!(
+ string_to_time("12:30:45"),
+ Some(12 * NANOS_PER_HOUR + 30 * NANOS_PER_MINUTE + 45 *
NANOS_PER_SECOND)
+ );
+ assert_eq!(string_to_time("00:00:00"), Some(0));
+ assert_eq!(
+ string_to_time("23:59:59"),
+ Some(23 * NANOS_PER_HOUR + 59 * NANOS_PER_MINUTE + 59 *
NANOS_PER_SECOND)
+ );
+ }
+
+ #[test]
+ fn test_fractional_seconds() {
+ // 1 digit
+ assert_eq!(
+ string_to_time("00:00:00.1"),
+ Some(100_000 * NANOS_PER_MICRO)
+ );
+ // 3 digits
+ assert_eq!(
+ string_to_time("00:00:00.001"),
+ Some(1_000 * NANOS_PER_MICRO)
+ );
+ // 6 digits
+ assert_eq!(string_to_time("00:00:00.000001"), Some(NANOS_PER_MICRO));
+ // >6 digits truncated to microseconds
+ assert_eq!(
+ string_to_time("00:00:00.1234567"),
+ Some(123_456 * NANOS_PER_MICRO)
+ );
+ // Full precision
+ assert_eq!(
+ string_to_time("23:59:59.999999"),
+ Some(
+ 23 * NANOS_PER_HOUR
+ + 59 * NANOS_PER_MINUTE
+ + 59 * NANOS_PER_SECOND
+ + 999_999 * NANOS_PER_MICRO
+ )
+ );
+ }
+
+ #[test]
+ fn test_single_digit_components() {
+ // Single digit hour, minute, second
+ assert_eq!(
+ string_to_time("1:2:3"),
+ Some(NANOS_PER_HOUR + 2 * NANOS_PER_MINUTE + 3 * NANOS_PER_SECOND)
+ );
+ assert_eq!(
+ string_to_time("1:2:3.04"),
+ Some(
+ NANOS_PER_HOUR
+ + 2 * NANOS_PER_MINUTE
+ + 3 * NANOS_PER_SECOND
+ + 40_000 * NANOS_PER_MICRO
+ )
+ );
+ }
+
+ #[test]
+ fn test_t_prefix() {
+ assert_eq!(
+ string_to_time("T1:02:3.04"),
+ Some(
+ NANOS_PER_HOUR
+ + 2 * NANOS_PER_MINUTE
+ + 3 * NANOS_PER_SECOND
+ + 40_000 * NANOS_PER_MICRO
+ )
+ );
+ assert_eq!(
+ string_to_time("T12:30:45"),
+ Some(12 * NANOS_PER_HOUR + 30 * NANOS_PER_MINUTE + 45 *
NANOS_PER_SECOND)
+ );
+ }
+
+ #[test]
+ fn test_am_pm() {
+ // 12:00:00 AM = midnight
+ assert_eq!(string_to_time("12:00:00 AM"), Some(0));
+ // 1:00:00 AM
+ assert_eq!(string_to_time("1:00:00 AM"), Some(NANOS_PER_HOUR));
+ // 11:59:59 AM
+ assert_eq!(
+ string_to_time("11:59:59 AM"),
+ Some(11 * NANOS_PER_HOUR + 59 * NANOS_PER_MINUTE + 59 *
NANOS_PER_SECOND)
+ );
+ // 12:00:00 PM = noon
+ assert_eq!(string_to_time("12:00:00 PM"), Some(12 * NANOS_PER_HOUR));
+ // 1:00:00 PM = 13:00
+ assert_eq!(string_to_time("1:00:00 PM"), Some(13 * NANOS_PER_HOUR));
+ // 11:59:59 PM = 23:59:59
+ assert_eq!(
+ string_to_time("11:59:59 PM"),
+ Some(23 * NANOS_PER_HOUR + 59 * NANOS_PER_MINUTE + 59 *
NANOS_PER_SECOND)
+ );
+ // Case insensitive
+ assert_eq!(string_to_time("12:00:00 am"), Some(0));
+ assert_eq!(string_to_time("12:00:00 pm"), Some(12 * NANOS_PER_HOUR));
+ // No space before AM/PM
+ assert_eq!(string_to_time("12:00:00AM"), Some(0));
+ assert_eq!(string_to_time("1:00:00PM"), Some(13 * NANOS_PER_HOUR));
+ // With fractional seconds
+ assert_eq!(
+ string_to_time("12:59:59.999999 PM"),
+ Some(
+ 12 * NANOS_PER_HOUR
+ + 59 * NANOS_PER_MINUTE
+ + 59 * NANOS_PER_SECOND
+ + 999_999 * NANOS_PER_MICRO
+ )
+ );
+ }
+
+ #[test]
+ fn test_invalid_am_pm() {
+ // Hour 0 invalid in 12-hour format
+ assert_eq!(string_to_time("0:00:00 AM"), None);
+ // Hour 13 invalid in 12-hour format
+ assert_eq!(string_to_time("13:00:00 AM"), None);
+ assert_eq!(string_to_time("13:00:00 PM"), None);
+ }
+
+ #[test]
+ fn test_invalid_inputs() {
+ assert_eq!(string_to_time(""), None);
+ assert_eq!(string_to_time(" "), None);
+ assert_eq!(string_to_time("XYZ"), None);
+ assert_eq!(string_to_time("24:00:00"), None);
+ assert_eq!(string_to_time("23:60:00"), None);
+ assert_eq!(string_to_time("23:00:60"), None);
+ // Date component present
+ assert_eq!(string_to_time("2025-03-09 00:00:00"), None);
+ // Timezone present
+ assert_eq!(string_to_time("00:01:02 UTC"), None);
+ // Just digits without separators
+ assert_eq!(string_to_time("120000"), None);
+ }
+
+ #[test]
+ fn test_trailing_whitespace() {
+ assert_eq!(string_to_time("12:30:45 "), string_to_time("12:30:45"));
+ assert_eq!(string_to_time("1:00:00 AM "), string_to_time("1:00:00
AM"));
+ }
+
+ #[test]
+ fn test_three_digit_components() {
+ // 3-digit hour/minute/second must be rejected (Spark requires 1-2
digits)
+ assert_eq!(string_to_time("001:02:03"), None);
+ assert_eq!(string_to_time("12:001:03"), None);
+ assert_eq!(string_to_time("12:02:003"), None);
+ }
+
+ #[test]
+ fn test_leading_whitespace() {
+ assert_eq!(string_to_time(" 12:30"), string_to_time("12:30"));
+ assert_eq!(string_to_time(" 12:30:45"), string_to_time("12:30:45"));
+ assert_eq!(string_to_time(" 12:30:45 "), string_to_time("12:30:45"));
+ assert_eq!(string_to_time(" 1:00:00 AM"), string_to_time("1:00:00
AM"));
+ // Tabs and newlines are also trimmed (Spark's
isWhitespaceOrISOControl)
+ assert_eq!(string_to_time("\t12:30:45"), string_to_time("12:30:45"));
+ assert_eq!(string_to_time("\n12:30:45"), string_to_time("12:30:45"));
+ // T-prefix is rejected when preceded by whitespace because Spark's
+ // parseTimestampString gates the T-prefix branch on j == 0 (start of
+ // the already-trimmed segment), so leading whitespace moves j past 0.
+ assert_eq!(string_to_time(" T12:30:45"), None);
+ assert_eq!(string_to_time(" T12:30"), None);
+ }
+}
diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs
index 4f2d5da46..f60f01f48 100644
--- a/native/spark-expr/src/lib.rs
+++ b/native/spark-expr/src/lib.rs
@@ -75,9 +75,9 @@ pub use comet_scalar_funcs::{
};
pub use csv_funcs::*;
pub use datetime_funcs::{
- SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkHour,
SparkHoursTransform,
- SparkMakeDate, SparkMinute, SparkSecond, SparkSecondsToTimestamp,
SparkUnixTimestamp,
- TimestampTruncExpr,
+ spark_to_time, SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc,
SparkHour,
+ SparkHoursTransform, SparkMakeDate, SparkMakeTime, SparkMinute,
SparkSecond,
+ SparkSecondsToTimestamp, SparkUnixTimestamp, TimestampTruncExpr,
};
pub use error::{decimal_overflow_error, SparkError, SparkErrorWithContext,
SparkResult};
pub use hash_funcs::*;
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index bfdc78226..5ecf14db3 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -368,6 +368,8 @@ object QueryPlanSerde extends Logging with CometExprShim
with CometTypeShim {
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _:
TimestampNTZType |
_: DecimalType | _: DateType | _: BooleanType | _: NullType =>
true
+ case dt if isTimeType(dt) =>
+ true
case s: StructType if allowComplex =>
s.fields.nonEmpty &&
s.fields.map(_.dataType).forall(supportedDataType(_, allowComplex))
case a: ArrayType if allowComplex =>
@@ -402,6 +404,7 @@ object QueryPlanSerde extends Logging with CometExprShim
with CometTypeShim {
case _: ArrayType => 14
case _: MapType => 15
case _: StructType => 16
+ case dt if isTimeType(dt) => 17
case dt =>
logWarning(s"Cannot serialize Spark data type: $dt")
return None
diff --git a/spark/src/main/scala/org/apache/comet/serde/hash.scala
b/spark/src/main/scala/org/apache/comet/serde/hash.scala
index b05919973..a58e81b02 100644
--- a/spark/src/main/scala/org/apache/comet/serde/hash.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/hash.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
Expression, Murmur3
import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType,
IntegerType, LongType, MapType, StringType, StructType}
import org.apache.comet.CometSparkSessionExtensions.withInfo
-import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal,
scalarFunctionExprToProtoWithReturnType, serializeDataType, supportedDataType}
+import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, isTimeType,
scalarFunctionExprToProtoWithReturnType, serializeDataType, supportedDataType}
object CometXxHash64 extends CometExpressionSerde[XxHash64] {
override def convert(
@@ -126,6 +126,9 @@ private object HashUtils {
isSupportedDataType(expr, a.elementType)
case m: MapType =>
isSupportedDataType(expr, m.keyType) && isSupportedDataType(expr,
m.valueType)
+ case dt if isTimeType(dt) =>
+ withInfo(expr, s"Unsupported datatype $dt")
+ false
case _ if !supportedDataType(dt, allowComplex = true) =>
withInfo(expr, s"Unsupported datatype $dt")
false
diff --git a/spark/src/main/scala/org/apache/comet/serde/literals.scala
b/spark/src/main/scala/org/apache/comet/serde/literals.scala
index c81b146a8..5b03985c0 100644
--- a/spark/src/main/scala/org/apache/comet/serde/literals.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/literals.scala
@@ -32,7 +32,7 @@ import com.google.protobuf.ByteString
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.DataTypeSupport.isComplexType
import org.apache.comet.serde.{CometExpressionSerde, Compatible,
ExprOuterClass, LiteralOuterClass, SupportLevel, Unsupported}
-import org.apache.comet.serde.QueryPlanSerde.{serializeDataType,
supportedDataType}
+import org.apache.comet.serde.QueryPlanSerde.{isTimeType, serializeDataType,
supportedDataType}
import org.apache.comet.serde.Types.ListLiteral
object CometLiteral extends CometExpressionSerde[Literal] with Logging {
@@ -80,6 +80,8 @@ object CometLiteral extends CometExpressionSerde[Literal]
with Logging {
case _: IntegerType | _: DateType =>
exprBuilder.setIntVal(value.asInstanceOf[Int])
case _: LongType | _: TimestampType | _: TimestampNTZType =>
exprBuilder.setLongVal(value.asInstanceOf[Long])
+ case dt if isTimeType(dt) =>
+ exprBuilder.setLongVal(value.asInstanceOf[Long])
case _: FloatType => exprBuilder.setFloatVal(value.asInstanceOf[Float])
case _: DoubleType =>
exprBuilder.setDoubleVal(value.asInstanceOf[Double])
case _: StringType =>
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala
index 78f2e81c7..783367c05 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala
@@ -108,6 +108,12 @@ object Utils extends CometTypeShim with Logging {
case yi: ArrowType.Interval if yi.getUnit == IntervalUnit.YEAR_MONTH =>
YearMonthIntervalType()
case di: ArrowType.Interval if di.getUnit == IntervalUnit.DAY_TIME =>
DayTimeIntervalType()
+ case t: ArrowType.Time if t.getUnit == TimeUnit.NANOSECOND &&
t.getBitWidth == 64 =>
+ // scalastyle:off classforname
+ val clazz = Class.forName("org.apache.spark.sql.types.TimeType$")
+ // scalastyle:on classforname
+ val module = clazz.getField("MODULE$").get(null)
+ clazz.getMethod("apply").invoke(module).asInstanceOf[DataType]
case _ => throw new UnsupportedOperationException(s"Unsupported data type:
${dt.toString}")
}
@@ -142,6 +148,8 @@ object Utils extends CometTypeShim with Logging {
}
case TimestampNTZType =>
new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)
+ case dt if isTimeType(dt) =>
+ new ArrowType.Time(TimeUnit.NANOSECOND, 64)
case _ =>
throw new UnsupportedOperationException(
s"Unsupported data type: [${dt.getClass.getName}]
${dt.catalogString}")
@@ -393,7 +401,7 @@ object Utils extends CometTypeShim with Logging {
_: BigIntVector | _: Float4Vector | _: Float8Vector | _:
VarCharVector |
_: DecimalVector | _: DateDayVector | _: TimeStampMicroTZVector | _:
VarBinaryVector |
_: FixedSizeBinaryVector | _: TimeStampMicroVector | _: StructVector
| _: ListVector |
- _: MapVector | _: NullVector) =>
+ _: MapVector | _: NullVector | _: TimeNanoVector) =>
v.asInstanceOf[FieldVector]
case _ =>
throw new SparkException(s"Unsupported Arrow Vector for $reason:
${valueVector.getClass}")
diff --git
a/spark/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala
b/spark/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala
index 5c47a57a2..041eb3ce6 100644
--- a/spark/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala
+++ b/spark/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala
@@ -32,4 +32,7 @@ trait CometTypeShim {
@nowarn // Spark 4 feature; Variant shredding doesn't exist in Spark 3.x.
def isVariantStruct(s: StructType): Boolean = false
+
+ @nowarn // Spark 4.1 feature; TimeType doesn't exist in Spark 3.x.
+ def isTimeType(dt: DataType): Boolean = false
}
diff --git
a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala
b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala
index 5e906a0d8..88e6f27f9 100644
--- a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala
+++ b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala
@@ -23,9 +23,10 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum
import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.types.StringTypeWithCollation
-import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
DataTypes, MapType, StringType}
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
DataTypes, MapType, StringType, TimeType}
import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.withInfo
@@ -92,6 +93,16 @@ trait CometExprShim extends CommonStringExprs {
val Seq(bin, charset, _, _) = s.arguments
stringDecode(expr, charset, bin, inputs, binding)
+ case s: StaticInvoke
+ if s.staticObject == classOf[DateTimeUtils.type] &&
+ s.functionName == "makeTime" &&
+ s.arguments.size == 3 &&
+ s.dataType.isInstanceOf[TimeType] =>
+ val childExprs = s.arguments.map(exprToProtoInternal(_, inputs,
binding))
+ val optExpr =
+ scalarFunctionExprToProtoWithReturnType("make_time", s.dataType,
true, childExprs: _*)
+ optExprWithInfo(optExpr, expr, s.arguments: _*)
+
case expr @ ToPrettyString(child, timeZoneId) =>
val castSupported = CometCast.isSupported(
child.dataType,
@@ -135,6 +146,7 @@ trait CometExprShim extends CommonStringExprs {
// In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement
is
// Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct
the
// original StructsToJson and recurse so support-level checks apply.
+ // ToTime (Spark 4.1) resolves to Invoke(Literal(ToTimeParser), "parse",
TimeType(), ...).
case i: Invoke =>
(i.targetObject, i.functionName, i.arguments) match {
case (Literal(evaluator: StructsToJsonEvaluator, _), "evaluate",
Seq(child)) =>
@@ -142,6 +154,27 @@ trait CometExprShim extends CommonStringExprs {
StructsToJson(evaluator.options, child, evaluator.timeZoneId),
inputs,
binding)
+ case (Literal(parser: ToTimeParser, _), "parse", args)
+ if i.dataType.isInstanceOf[TimeType] && parser.fmt.isEmpty &&
args.size == 1 =>
+ val childExprs = args.map(exprToProtoInternal(_, inputs, binding))
+ val optExpr =
+ scalarFunctionExprToProtoWithReturnType("to_time", i.dataType,
true, childExprs: _*)
+ optExprWithInfo(optExpr, i, args: _*)
+ case _ => None
+ }
+
+ // try_to_time resolves to TryEval(Invoke(Literal(ToTimeParser),
"parse", ...))
+ case TryEval(i: Invoke) =>
+ (i.targetObject, i.functionName, i.arguments) match {
+ case (Literal(parser: ToTimeParser, _), "parse", args)
+ if i.dataType.isInstanceOf[TimeType] && parser.fmt.isEmpty &&
args.size == 1 =>
+ val childExprs = args.map(exprToProtoInternal(_, inputs, binding))
+ val optExpr = scalarFunctionExprToProtoWithReturnType(
+ "to_time",
+ i.dataType,
+ false,
+ childExprs: _*)
+ optExprWithInfo(optExpr, expr, args: _*)
case _ => None
}
diff --git
a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala
b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala
index 5e906a0d8..88e6f27f9 100644
--- a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala
+++ b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala
@@ -23,9 +23,10 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum
import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.types.StringTypeWithCollation
-import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
DataTypes, MapType, StringType}
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
DataTypes, MapType, StringType, TimeType}
import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.withInfo
@@ -92,6 +93,16 @@ trait CometExprShim extends CommonStringExprs {
val Seq(bin, charset, _, _) = s.arguments
stringDecode(expr, charset, bin, inputs, binding)
+ case s: StaticInvoke
+ if s.staticObject == classOf[DateTimeUtils.type] &&
+ s.functionName == "makeTime" &&
+ s.arguments.size == 3 &&
+ s.dataType.isInstanceOf[TimeType] =>
+ val childExprs = s.arguments.map(exprToProtoInternal(_, inputs,
binding))
+ val optExpr =
+ scalarFunctionExprToProtoWithReturnType("make_time", s.dataType,
true, childExprs: _*)
+ optExprWithInfo(optExpr, expr, s.arguments: _*)
+
case expr @ ToPrettyString(child, timeZoneId) =>
val castSupported = CometCast.isSupported(
child.dataType,
@@ -135,6 +146,7 @@ trait CometExprShim extends CommonStringExprs {
// In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement
is
// Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct
the
// original StructsToJson and recurse so support-level checks apply.
+ // ToTime (Spark 4.1) resolves to Invoke(Literal(ToTimeParser), "parse",
TimeType(), ...).
case i: Invoke =>
(i.targetObject, i.functionName, i.arguments) match {
case (Literal(evaluator: StructsToJsonEvaluator, _), "evaluate",
Seq(child)) =>
@@ -142,6 +154,27 @@ trait CometExprShim extends CommonStringExprs {
StructsToJson(evaluator.options, child, evaluator.timeZoneId),
inputs,
binding)
+ case (Literal(parser: ToTimeParser, _), "parse", args)
+ if i.dataType.isInstanceOf[TimeType] && parser.fmt.isEmpty &&
args.size == 1 =>
+ val childExprs = args.map(exprToProtoInternal(_, inputs, binding))
+ val optExpr =
+ scalarFunctionExprToProtoWithReturnType("to_time", i.dataType,
true, childExprs: _*)
+ optExprWithInfo(optExpr, i, args: _*)
+ case _ => None
+ }
+
+ // try_to_time resolves to TryEval(Invoke(Literal(ToTimeParser),
"parse", ...))
+ case TryEval(i: Invoke) =>
+ (i.targetObject, i.functionName, i.arguments) match {
+ case (Literal(parser: ToTimeParser, _), "parse", args)
+ if i.dataType.isInstanceOf[TimeType] && parser.fmt.isEmpty &&
args.size == 1 =>
+ val childExprs = args.map(exprToProtoInternal(_, inputs, binding))
+ val optExpr = scalarFunctionExprToProtoWithReturnType(
+ "to_time",
+ i.dataType,
+ false,
+ childExprs: _*)
+ optExprWithInfo(optExpr, expr, args: _*)
case _ => None
}
diff --git
a/spark/src/main/spark-4.x/org/apache/comet/shims/CometTypeShim.scala
b/spark/src/main/spark-4.x/org/apache/comet/shims/CometTypeShim.scala
index cc287a9b9..17b2738b3 100644
--- a/spark/src/main/spark-4.x/org/apache/comet/shims/CometTypeShim.scala
+++ b/spark/src/main/spark-4.x/org/apache/comet/shims/CometTypeShim.scala
@@ -53,4 +53,7 @@ trait CometTypeShim {
// variant shredding layout, so reading such a struct natively returns
nulls. Detect the marker
// and force scan fallback.
def isVariantStruct(s: StructType): Boolean =
VariantMetadata.isVariantStruct(s)
+
+ def isTimeType(dt: DataType): Boolean =
+ dt.getClass.getSimpleName.startsWith("TimeType")
}
diff --git
a/spark/src/test/resources/sql-tests/expressions/datetime/make_time.sql
b/spark/src/test/resources/sql-tests/expressions/datetime/make_time.sql
new file mode 100644
index 000000000..8270e8f32
--- /dev/null
+++ b/spark/src/test/resources/sql-tests/expressions/datetime/make_time.sql
@@ -0,0 +1,141 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- MinSparkVersion: 4.1
+-- Config: spark.sql.timeType.enabled=true
+
+statement
+CREATE TABLE test_make_time(hours int, minutes int, secs decimal(16,6)) USING
parquet
+
+statement
+INSERT INTO test_make_time VALUES
+ (0, 0, 0.000000),
+ (12, 30, 45.123456),
+ (23, 59, 59.999999),
+ (1, 2, 3.500000),
+ (0, 0, 0.000001),
+ (NULL, 0, 0.000000),
+ (12, NULL, 30.000000),
+ (12, 30, NULL),
+ (NULL, NULL, NULL)
+
+-- column arguments (spark_answer_only: shuffle does not support TimeType yet;
TODO: promote to
+-- full native-verification once SPARK-51779 lands)
+query spark_answer_only
+SELECT hours, minutes, secs, make_time(hours, minutes, secs) FROM
test_make_time ORDER BY hours, minutes, secs
+
+-- literal hour, column minutes and secs (spark_answer_only: shuffle does not
support TimeType yet)
+query spark_answer_only
+SELECT make_time(10, minutes, secs) FROM test_make_time ORDER BY minutes, secs
+
+-- column hours, literal minutes and secs (spark_answer_only: shuffle does not
support TimeType yet)
+query spark_answer_only
+SELECT make_time(hours, 15, 30.5) FROM test_make_time ORDER BY hours
+
+-- all literals
+query
+SELECT make_time(0, 0, 0)
+
+query
+SELECT make_time(12, 30, 45.123456)
+
+query
+SELECT make_time(23, 59, 59.999999)
+
+-- midnight
+query
+SELECT make_time(0, 0, 0.0)
+
+-- one microsecond after midnight
+query
+SELECT make_time(0, 0, 0.000001)
+
+-- end of day
+query
+SELECT make_time(23, 59, 59.999999)
+
+-- null handling with literals
+query
+SELECT make_time(NULL, 0, 0)
+
+query
+SELECT make_time(12, NULL, 0)
+
+query
+SELECT make_time(12, 30, NULL)
+
+-- integer seconds (implicit cast to decimal)
+query
+SELECT make_time(10, 20, 30)
+
+query
+SELECT make_time(1, 2, 0)
+
+-- boundary valid values
+query
+SELECT make_time(0, 0, 0)
+
+query
+SELECT make_time(23, 0, 0)
+
+query
+SELECT make_time(0, 59, 0)
+
+query
+SELECT make_time(0, 0, 59.999999)
+
+-- invalid hours - should throw error
+query expect_error(HourOfDay)
+SELECT make_time(24, 0, 0)
+
+query expect_error(HourOfDay)
+SELECT make_time(25, 2, 23.5)
+
+query expect_error(HourOfDay)
+SELECT make_time(-1, 0, 0)
+
+-- invalid minutes - should throw error
+query expect_error(MinuteOfHour)
+SELECT make_time(12, 60, 0)
+
+query expect_error(MinuteOfHour)
+SELECT make_time(23, -1, 23.5)
+
+-- invalid seconds - should throw error
+query expect_error(SecondOfMinute)
+SELECT make_time(12, 30, 60.0)
+
+query expect_error(SecondOfMinute)
+SELECT make_time(23, 12, 100.5)
+
+query expect_error(SecondOfMinute)
+SELECT make_time(0, 0, -1.0)
+
+-- overflow seconds
+query expect_error(SecondOfMinute)
+SELECT make_time(1, 18, 4294967297.999999)
+
+-- time literal in comparison (make_time with all literals is constant-folded
to a Time literal)
+query
+SELECT make_time(12, 30, 0) > make_time(11, 0, 0)
+
+query
+SELECT make_time(0, 0, 0) = make_time(0, 0, 0)
+
+-- current_time() is foldable and produces a Time literal
+query
+SELECT current_time() IS NOT NULL
diff --git
a/spark/src/test/resources/sql-tests/expressions/datetime/to_time.sql
b/spark/src/test/resources/sql-tests/expressions/datetime/to_time.sql
new file mode 100644
index 000000000..b3ac439fd
--- /dev/null
+++ b/spark/src/test/resources/sql-tests/expressions/datetime/to_time.sql
@@ -0,0 +1,260 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- MinSparkVersion: 4.1
+-- Config: spark.sql.timeType.enabled=true
+
+statement
+CREATE TABLE test_to_time(s STRING) USING parquet
+
+statement
+INSERT INTO test_to_time VALUES
+ ('00:00'),
+ ('12:30'),
+ ('23:59'),
+ ('12:30:45'),
+ ('00:00:00'),
+ ('23:59:59'),
+ ('00:00:00.1'),
+ ('00:00:00.001'),
+ ('00:00:00.000001'),
+ ('00:00:00.1234567'),
+ ('23:59:59.999999'),
+ ('1:2:3'),
+ ('1:2:3.04'),
+ ('T12:30:45'),
+ ('T1:02:3.04'),
+ ('12:00:00 AM'),
+ ('1:00:00 AM'),
+ ('12:00:00 PM'),
+ ('1:00:00 PM'),
+ ('11:59:59 PM'),
+ ('12:59:59.999999 PM'),
+ ('12:00:00AM'),
+ ('1:00:00PM'),
+ (' 12:30:45'),
+ (' 12:30 PM'),
+ (NULL)
+
+-- column argument: basic time formats (spark_answer_only: shuffle does not
support TimeType yet;
+-- TODO: promote to full native-verification once SPARK-51779 lands)
+query spark_answer_only
+SELECT s, to_time(s) FROM test_to_time ORDER BY s
+
+-- literal HH:mm
+query
+SELECT to_time('00:00')
+
+query
+SELECT to_time('12:30')
+
+query
+SELECT to_time('23:59')
+
+-- literal HH:mm:ss
+query
+SELECT to_time('12:30:45')
+
+query
+SELECT to_time('00:00:00')
+
+query
+SELECT to_time('23:59:59')
+
+-- fractional seconds
+query
+SELECT to_time('00:00:00.1')
+
+query
+SELECT to_time('00:00:00.001')
+
+query
+SELECT to_time('00:00:00.000001')
+
+query
+SELECT to_time('23:59:59.999999')
+
+-- more than 6 fractional digits (truncated to microseconds)
+query
+SELECT to_time('00:00:00.1234567')
+
+-- single digit hour/min/sec
+query
+SELECT to_time('1:2:3')
+
+query
+SELECT to_time('1:2:3.04')
+
+-- T-prefix
+query
+SELECT to_time('T12:30:45')
+
+query
+SELECT to_time('T1:02:3.04')
+
+-- AM/PM
+query
+SELECT to_time('12:00:00 AM')
+
+query
+SELECT to_time('1:00:00 AM')
+
+query
+SELECT to_time('11:59:59 AM')
+
+query
+SELECT to_time('12:00:00 PM')
+
+query
+SELECT to_time('1:00:00 PM')
+
+query
+SELECT to_time('11:59:59 PM')
+
+-- AM/PM case insensitive
+query
+SELECT to_time('12:00:00 am')
+
+query
+SELECT to_time('12:00:00 pm')
+
+-- AM/PM without space
+query
+SELECT to_time('12:00:00AM')
+
+query
+SELECT to_time('1:00:00PM')
+
+-- AM/PM with fractional seconds
+query
+SELECT to_time('12:59:59.999999 PM')
+
+-- null input
+query
+SELECT to_time(NULL)
+
+-- trailing whitespace
+query
+SELECT to_time('12:30:45 ')
+
+-- leading whitespace
+query
+SELECT to_time(' 12:30:45')
+
+query
+SELECT to_time(' 12:30:45')
+
+query
+SELECT to_time(' 12:30:45 ')
+
+query
+SELECT to_time(' 12:30')
+
+query
+SELECT to_time(' 12:30 PM')
+
+query
+SELECT to_time(' 1:00:00 PM')
+
+-- leading tab and newline
+query
+SELECT to_time('\t12:30:45')
+
+query
+SELECT to_time('\n12:30:45')
+
+-- leading whitespace with T-prefix is rejected by Spark
+query expect_error(cannot be parsed to a TIME value)
+SELECT to_time(' T12:30:45')
+
+-- invalid inputs - should throw error with to_time
+query expect_error(cannot be parsed to a TIME value)
+SELECT to_time('')
+
+query expect_error(cannot be parsed to a TIME value)
+SELECT to_time('XYZ')
+
+query expect_error(cannot be parsed to a TIME value)
+SELECT to_time('24:00:00')
+
+query expect_error(cannot be parsed to a TIME value)
+SELECT to_time('23:60:00')
+
+query expect_error(cannot be parsed to a TIME value)
+SELECT to_time('23:00:60')
+
+query expect_error(cannot be parsed to a TIME value)
+SELECT to_time('120000')
+
+-- invalid AM/PM - should throw error
+query expect_error(cannot be parsed to a TIME value)
+SELECT to_time('0:00:00 AM')
+
+query expect_error(cannot be parsed to a TIME value)
+SELECT to_time('13:00:00 AM')
+
+query expect_error(cannot be parsed to a TIME value)
+SELECT to_time('13:00:00 PM')
+
+-- try_to_time: returns null for invalid inputs
+query
+SELECT try_to_time('12:30:45')
+
+query
+SELECT try_to_time('')
+
+query
+SELECT try_to_time('XYZ')
+
+query
+SELECT try_to_time('24:00:00')
+
+query
+SELECT try_to_time('23:60:00')
+
+query
+SELECT try_to_time(NULL)
+
+query
+SELECT try_to_time('0:00:00 AM')
+
+query
+SELECT try_to_time('13:00:00 PM')
+
+-- try_to_time: leading whitespace parses successfully
+query
+SELECT try_to_time(' 12:30:45')
+
+query
+SELECT try_to_time(' 1:00:00 PM')
+
+-- to_time with format pattern falls back to Spark (not supported natively)
+query expect_fallback(invoke is not supported)
+SELECT to_time('12:30:45', 'HH:mm:ss')
+
+statement
+CREATE TABLE test_to_time_col_fmt(s STRING, f STRING) USING parquet
+
+statement
+INSERT INTO test_to_time_col_fmt VALUES
+ ('14.30.00', 'HH.mm.ss'),
+ ('1230', 'HHmm')
+
+-- A non-foldable format column should fall back to Spark because Comet does
+-- not implement the format-pattern variant of to_time.
+query expect_fallback(invoke is not supported)
+SELECT to_time(s, f) FROM test_to_time_col_fmt
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
index b7a14d408..224f2ad34 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
@@ -103,6 +103,34 @@ object CometDatetimeExpressionBenchmark extends
CometBenchmarkBase {
}
}
+ def toTimeBenchmark(values: Int): Unit = {
+ withTempPath { dir =>
+ withTempTable("parquetV1Table") {
+ prepareTable(
+ dir,
+ spark.sql(
+ s"select concat(cast(abs(value) % 24 as string), ':',
lpad(cast(abs(value) % 60 as string), 2, '0'), ':', lpad(cast(abs(value) % 60
as string), 2, '0')) as s FROM $tbl"))
+ val name = "to_time"
+ val query = "select to_time(s) from parquetV1Table"
+ runExpressionBenchmark(name, values, query)
+ }
+ }
+ }
+
+ def makeTimeBenchmark(values: Int): Unit = {
+ withTempPath { dir =>
+ withTempTable("parquetV1Table") {
+ prepareTable(
+ dir,
+ spark.sql(
+ s"select cast(abs(value) % 24 as int) as h, cast(abs(value) % 60
as int) as m, cast(abs(value) % 60 as decimal(16,6)) as s FROM $tbl"))
+ val name = "make_time"
+ val query = "select make_time(h, m, s) from parquetV1Table"
+ runExpressionBenchmark(name, values, query)
+ }
+ }
+ }
+
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
val values = 1024 * 1024;
@@ -130,6 +158,15 @@ object CometDatetimeExpressionBenchmark extends
CometBenchmarkBase {
}
}
}
+
+ withSQLConf("spark.sql.timeType.enabled" -> "true") {
+ runBenchmarkWithTable("ToTime", values) { v =>
+ toTimeBenchmark(v)
+ }
+ runBenchmarkWithTable("MakeTime", values) { v =>
+ makeTimeBenchmark(v)
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]