This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new bd7834c1 Add support for time-zone, 3 & 5 digit years: Cast from
string to timestamp (#704)
bd7834c1 is described below
commit bd7834c12c5d82abc5638937f3318667d2c6e53e
Author: Akhil S S <[email protected]>
AuthorDate: Thu Aug 1 20:30:58 2024 +0530
Add support for time-zone, 3 & 5 digit years: Cast from string to timestamp
(#704)
---
native/spark-expr/src/cast.rs | 343 +++++++++++++++------
.../scala/org/apache/comet/CometCastSuite.scala | 20 +-
2 files changed, 267 insertions(+), 96 deletions(-)
diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs
index ae081897..e44b1c9f 100644
--- a/native/spark-expr/src/cast.rs
+++ b/native/spark-expr/src/cast.rs
@@ -15,14 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::{
- any::Any,
- fmt::{Debug, Display, Formatter},
- hash::{Hash, Hasher},
- num::Wrapping,
- sync::Arc,
-};
-
use arrow::{
array::{
cast::AsArray,
@@ -42,6 +34,14 @@ use arrow::{
};
use arrow_array::DictionaryArray;
use arrow_schema::{DataType, Schema};
+use std::str::FromStr;
+use std::{
+ any::Any,
+ fmt::{Debug, Display, Formatter},
+ hash::{Hash, Hasher},
+ num::Wrapping,
+ sync::Arc,
+};
use datafusion_common::{
cast::as_generic_string_array, internal_err, Result as DataFusionResult,
ScalarValue,
@@ -56,6 +56,7 @@ use num::{
};
use regex::Regex;
+use crate::timezone;
use crate::utils::{array_with_timezone, down_cast_any_ref};
use crate::{EvalMode, SparkError, SparkResult};
@@ -71,6 +72,67 @@ static CAST_OPTIONS: CastOptions = CastOptions {
.with_timestamp_format(TIMESTAMP_FORMAT),
};
+struct TimeStampInfo {
+ year: i32,
+ month: u32,
+ day: u32,
+ hour: u32,
+ minute: u32,
+ second: u32,
+ microsecond: u32,
+}
+
+impl Default for TimeStampInfo {
+ fn default() -> Self {
+ TimeStampInfo {
+ year: 1,
+ month: 1,
+ day: 1,
+ hour: 0,
+ minute: 0,
+ second: 0,
+ microsecond: 0,
+ }
+ }
+}
+
+impl TimeStampInfo {
+ pub fn with_year(&mut self, year: i32) -> &mut Self {
+ self.year = year;
+ self
+ }
+
+ pub fn with_month(&mut self, month: u32) -> &mut Self {
+ self.month = month;
+ self
+ }
+
+ pub fn with_day(&mut self, day: u32) -> &mut Self {
+ self.day = day;
+ self
+ }
+
+ pub fn with_hour(&mut self, hour: u32) -> &mut Self {
+ self.hour = hour;
+ self
+ }
+
+ pub fn with_minute(&mut self, minute: u32) -> &mut Self {
+ self.minute = minute;
+ self
+ }
+
+ pub fn with_second(&mut self, second: u32) -> &mut Self {
+ self.second = second;
+ self
+ }
+
+ pub fn with_microsecond(&mut self, microsecond: u32) -> &mut Self {
+ self.microsecond = microsecond;
+ self
+ }
+}
+
#[derive(Debug, Hash)]
pub struct Cast {
pub child: Arc<dyn PhysicalExpr>,
@@ -100,13 +162,15 @@ macro_rules! cast_utf8_to_int {
}};
}
macro_rules! cast_utf8_to_timestamp {
- ($array:expr, $eval_mode:expr, $array_type:ty, $cast_method:ident) => {{
+ ($array:expr, $eval_mode:expr, $array_type:ty, $cast_method:ident,
$tz:expr) => {{
let len = $array.len();
let mut cast_array =
PrimitiveArray::<$array_type>::builder(len).with_timezone("UTC");
for i in 0..len {
if $array.is_null(i) {
cast_array.append_null()
- } else if let Ok(Some(cast_value)) =
$cast_method($array.value(i).trim(), $eval_mode) {
+ } else if let Ok(Some(cast_value)) =
+ $cast_method($array.value(i).trim(), $eval_mode, $tz)
+ {
cast_array.append_value(cast_value);
} else {
cast_array.append_null()
@@ -574,7 +638,7 @@ fn cast_array(
spark_cast_utf8_to_boolean::<i64>(&array, eval_mode)
}
(DataType::Utf8, DataType::Timestamp(_, _)) => {
- cast_string_to_timestamp(&array, to_type, eval_mode)
+ cast_string_to_timestamp(&array, to_type, eval_mode, &timezone)
}
(DataType::Utf8, DataType::Date32) => cast_string_to_date(&array,
to_type, eval_mode),
(DataType::Int64, DataType::Int32)
@@ -782,19 +846,23 @@ fn cast_string_to_timestamp(
array: &ArrayRef,
to_type: &DataType,
eval_mode: EvalMode,
+ timezone_str: &str,
) -> SparkResult<ArrayRef> {
let string_array = array
.as_any()
.downcast_ref::<GenericStringArray<i32>>()
.expect("Expected a string array");
+ let tz = &timezone::Tz::from_str(timezone_str).unwrap();
+
let cast_array: ArrayRef = match to_type {
DataType::Timestamp(_, _) => {
cast_utf8_to_timestamp!(
string_array,
eval_mode,
TimestampMicrosecondType,
- timestamp_parser
+ timestamp_parser,
+ tz
)
}
_ => unreachable!("Invalid data type {:?} in cast from string",
to_type),
@@ -1344,7 +1412,11 @@ impl PhysicalExpr for Cast {
}
}
-fn timestamp_parser(value: &str, eval_mode: EvalMode) ->
SparkResult<Option<i64>> {
+fn timestamp_parser<T: TimeZone>(
+ value: &str,
+ eval_mode: EvalMode,
+ tz: &T,
+) -> SparkResult<Option<i64>> {
let value = value.trim();
if value.is_empty() {
return Ok(None);
@@ -1352,31 +1424,31 @@ fn timestamp_parser(value: &str, eval_mode: EvalMode)
-> SparkResult<Option<i64>
// Define regex patterns and corresponding parsing functions
let patterns = &[
(
- Regex::new(r"^\d{4}$").unwrap(),
- parse_str_to_year_timestamp as fn(&str) ->
SparkResult<Option<i64>>,
+ Regex::new(r"^\d{4,5}$").unwrap(),
+ parse_str_to_year_timestamp as fn(&str, &T) ->
SparkResult<Option<i64>>,
),
(
- Regex::new(r"^\d{4}-\d{2}$").unwrap(),
+ Regex::new(r"^\d{4,5}-\d{2}$").unwrap(),
parse_str_to_month_timestamp,
),
(
- Regex::new(r"^\d{4}-\d{2}-\d{2}$").unwrap(),
+ Regex::new(r"^\d{4,5}-\d{2}-\d{2}$").unwrap(),
parse_str_to_day_timestamp,
),
(
- Regex::new(r"^\d{4}-\d{2}-\d{2}T\d{1,2}$").unwrap(),
+ Regex::new(r"^\d{4,5}-\d{2}-\d{2}T\d{1,2}$").unwrap(),
parse_str_to_hour_timestamp,
),
(
- Regex::new(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}$").unwrap(),
+ Regex::new(r"^\d{4,5}-\d{2}-\d{2}T\d{2}:\d{2}$").unwrap(),
parse_str_to_minute_timestamp,
),
(
- Regex::new(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$").unwrap(),
+ Regex::new(r"^\d{4,5}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$").unwrap(),
parse_str_to_second_timestamp,
),
(
-
Regex::new(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,6}$").unwrap(),
+
Regex::new(r"^\d{4,5}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,6}$").unwrap(),
parse_str_to_microsecond_timestamp,
),
(
@@ -1390,7 +1462,7 @@ fn timestamp_parser(value: &str, eval_mode: EvalMode) ->
SparkResult<Option<i64>
// Iterate through patterns and try matching
for (pattern, parse_func) in patterns {
if pattern.is_match(value) {
- timestamp = parse_func(value)?;
+ timestamp = parse_func(value, tz)?;
break;
}
}
@@ -1415,38 +1487,24 @@ fn timestamp_parser(value: &str, eval_mode: EvalMode)
-> SparkResult<Option<i64>
}
}
-fn parse_ymd_timestamp(year: i32, month: u32, day: u32) ->
SparkResult<Option<i64>> {
- let datetime = chrono::Utc.with_ymd_and_hms(year, month, day, 0, 0, 0);
-
- // Check if datetime is not None
- let utc_datetime = match datetime.single() {
- Some(dt) => dt.with_timezone(&chrono::Utc),
- None => {
- return Err(SparkError::Internal(
- "Failed to parse timestamp".to_string(),
- ));
- }
- };
-
- Ok(Some(utc_datetime.timestamp_micros()))
-}
-
-fn parse_hms_timestamp(
- year: i32,
- month: u32,
- day: u32,
- hour: u32,
- minute: u32,
- second: u32,
- microsecond: u32,
+fn parse_timestamp_to_micros<T: TimeZone>(
+ timestamp_info: &TimeStampInfo,
+ tz: &T,
) -> SparkResult<Option<i64>> {
- let datetime = chrono::Utc.with_ymd_and_hms(year, month, day, hour,
minute, second);
+ let datetime = tz.with_ymd_and_hms(
+ timestamp_info.year,
+ timestamp_info.month,
+ timestamp_info.day,
+ timestamp_info.hour,
+ timestamp_info.minute,
+ timestamp_info.second,
+ );
// Check if datetime is not None
- let utc_datetime = match datetime.single() {
+ let tz_datetime = match datetime.single() {
Some(dt) => dt
- .with_timezone(&chrono::Utc)
- .with_nanosecond(microsecond * 1000),
+ .with_timezone(tz)
+ .with_nanosecond(timestamp_info.microsecond * 1000),
None => {
return Err(SparkError::Internal(
"Failed to parse timestamp".to_string(),
@@ -1454,7 +1512,7 @@ fn parse_hms_timestamp(
}
};
- let result = match utc_datetime {
+ let result = match tz_datetime {
Some(dt) => dt.timestamp_micros(),
None => {
return Err(SparkError::Internal(
@@ -1466,7 +1524,11 @@ fn parse_hms_timestamp(
Ok(Some(result))
}
-fn get_timestamp_values(value: &str, timestamp_type: &str) ->
SparkResult<Option<i64>> {
+fn get_timestamp_values<T: TimeZone>(
+ value: &str,
+ timestamp_type: &str,
+ tz: &T,
+) -> SparkResult<Option<i64>> {
let values: Vec<_> = value
.split(|c| c == 'T' || c == '-' || c == ':' || c == '.')
.collect();
@@ -1478,64 +1540,99 @@ fn get_timestamp_values(value: &str, timestamp_type:
&str) -> SparkResult<Option
let second = values.get(5).map_or(0, |s| s.parse::<u32>().unwrap_or(0));
let microsecond = values.get(6).map_or(0, |ms|
ms.parse::<u32>().unwrap_or(0));
- match timestamp_type {
- "year" => parse_ymd_timestamp(year, 1, 1),
- "month" => parse_ymd_timestamp(year, month, 1),
- "day" => parse_ymd_timestamp(year, month, day),
- "hour" => parse_hms_timestamp(year, month, day, hour, 0, 0, 0),
- "minute" => parse_hms_timestamp(year, month, day, hour, minute, 0, 0),
- "second" => parse_hms_timestamp(year, month, day, hour, minute,
second, 0),
- "microsecond" => parse_hms_timestamp(year, month, day, hour, minute,
second, microsecond),
- _ => Err(SparkError::CastInvalidValue {
- value: value.to_string(),
- from_type: "STRING".to_string(),
- to_type: "TIMESTAMP".to_string(),
- }),
- }
+ let mut timestamp_info = TimeStampInfo::default();
+
+ let timestamp_info = match timestamp_type {
+ "year" => timestamp_info.with_year(year),
+ "month" => timestamp_info.with_year(year).with_month(month),
+ "day" => timestamp_info
+ .with_year(year)
+ .with_month(month)
+ .with_day(day),
+ "hour" => timestamp_info
+ .with_year(year)
+ .with_month(month)
+ .with_day(day)
+ .with_hour(hour),
+ "minute" => timestamp_info
+ .with_year(year)
+ .with_month(month)
+ .with_day(day)
+ .with_hour(hour)
+ .with_minute(minute),
+ "second" => timestamp_info
+ .with_year(year)
+ .with_month(month)
+ .with_day(day)
+ .with_hour(hour)
+ .with_minute(minute)
+ .with_second(second),
+ "microsecond" => timestamp_info
+ .with_year(year)
+ .with_month(month)
+ .with_day(day)
+ .with_hour(hour)
+ .with_minute(minute)
+ .with_second(second)
+ .with_microsecond(microsecond),
+ _ => {
+ return Err(SparkError::CastInvalidValue {
+ value: value.to_string(),
+ from_type: "STRING".to_string(),
+ to_type: "TIMESTAMP".to_string(),
+ })
+ }
+ };
+
+ parse_timestamp_to_micros(timestamp_info, tz)
}
-fn parse_str_to_year_timestamp(value: &str) -> SparkResult<Option<i64>> {
- get_timestamp_values(value, "year")
+fn parse_str_to_year_timestamp<T: TimeZone>(value: &str, tz: &T) ->
SparkResult<Option<i64>> {
+ get_timestamp_values(value, "year", tz)
}
-fn parse_str_to_month_timestamp(value: &str) -> SparkResult<Option<i64>> {
- get_timestamp_values(value, "month")
+fn parse_str_to_month_timestamp<T: TimeZone>(value: &str, tz: &T) ->
SparkResult<Option<i64>> {
+ get_timestamp_values(value, "month", tz)
}
-fn parse_str_to_day_timestamp(value: &str) -> SparkResult<Option<i64>> {
- get_timestamp_values(value, "day")
+fn parse_str_to_day_timestamp<T: TimeZone>(value: &str, tz: &T) ->
SparkResult<Option<i64>> {
+ get_timestamp_values(value, "day", tz)
}
-fn parse_str_to_hour_timestamp(value: &str) -> SparkResult<Option<i64>> {
- get_timestamp_values(value, "hour")
+fn parse_str_to_hour_timestamp<T: TimeZone>(value: &str, tz: &T) ->
SparkResult<Option<i64>> {
+ get_timestamp_values(value, "hour", tz)
}
-fn parse_str_to_minute_timestamp(value: &str) -> SparkResult<Option<i64>> {
- get_timestamp_values(value, "minute")
+fn parse_str_to_minute_timestamp<T: TimeZone>(value: &str, tz: &T) ->
SparkResult<Option<i64>> {
+ get_timestamp_values(value, "minute", tz)
}
-fn parse_str_to_second_timestamp(value: &str) -> SparkResult<Option<i64>> {
- get_timestamp_values(value, "second")
+fn parse_str_to_second_timestamp<T: TimeZone>(value: &str, tz: &T) ->
SparkResult<Option<i64>> {
+ get_timestamp_values(value, "second", tz)
}
-fn parse_str_to_microsecond_timestamp(value: &str) -> SparkResult<Option<i64>>
{
- get_timestamp_values(value, "microsecond")
+fn parse_str_to_microsecond_timestamp<T: TimeZone>(
+ value: &str,
+ tz: &T,
+) -> SparkResult<Option<i64>> {
+ get_timestamp_values(value, "microsecond", tz)
}
-fn parse_str_to_time_only_timestamp(value: &str) -> SparkResult<Option<i64>> {
+fn parse_str_to_time_only_timestamp<T: TimeZone>(value: &str, tz: &T) ->
SparkResult<Option<i64>> {
let values: Vec<&str> = value.split('T').collect();
let time_values: Vec<u32> = values[1]
.split(':')
.map(|v| v.parse::<u32>().unwrap_or(0))
.collect();
- let datetime = chrono::Utc::now();
+ let datetime = tz.from_utc_datetime(&chrono::Utc::now().naive_utc());
let timestamp = datetime
+ .with_timezone(tz)
.with_hour(time_values.first().copied().unwrap_or_default())
.and_then(|dt| dt.with_minute(*time_values.get(1).unwrap_or(&0)))
.and_then(|dt| dt.with_second(*time_values.get(2).unwrap_or(&0)))
.and_then(|dt| dt.with_nanosecond(*time_values.get(3).unwrap_or(&0) *
1_000))
- .map(|dt| dt.to_utc().timestamp_micros())
+ .map(|dt| dt.timestamp_micros())
.unwrap_or_default();
Ok(Some(timestamp))
@@ -1746,41 +1843,99 @@ mod tests {
use arrow::datatypes::TimestampMicrosecondType;
use arrow_array::StringArray;
use arrow_schema::TimeUnit;
+ use std::str::FromStr;
use super::*;
#[test]
#[cfg_attr(miri, ignore)] // test takes too long with miri
fn timestamp_parser_test() {
+ let tz = &timezone::Tz::from_str("UTC").unwrap();
// write for all formats
assert_eq!(
- timestamp_parser("2020", EvalMode::Legacy).unwrap(),
+ timestamp_parser("2020", EvalMode::Legacy, tz).unwrap(),
Some(1577836800000000) // this is in milliseconds
);
assert_eq!(
- timestamp_parser("2020-01", EvalMode::Legacy).unwrap(),
+ timestamp_parser("2020-01", EvalMode::Legacy, tz).unwrap(),
Some(1577836800000000)
);
assert_eq!(
- timestamp_parser("2020-01-01", EvalMode::Legacy).unwrap(),
+ timestamp_parser("2020-01-01", EvalMode::Legacy, tz).unwrap(),
Some(1577836800000000)
);
assert_eq!(
- timestamp_parser("2020-01-01T12", EvalMode::Legacy).unwrap(),
+ timestamp_parser("2020-01-01T12", EvalMode::Legacy, tz).unwrap(),
Some(1577880000000000)
);
assert_eq!(
- timestamp_parser("2020-01-01T12:34", EvalMode::Legacy).unwrap(),
+ timestamp_parser("2020-01-01T12:34", EvalMode::Legacy,
tz).unwrap(),
Some(1577882040000000)
);
assert_eq!(
- timestamp_parser("2020-01-01T12:34:56", EvalMode::Legacy).unwrap(),
+ timestamp_parser("2020-01-01T12:34:56", EvalMode::Legacy,
tz).unwrap(),
Some(1577882096000000)
);
assert_eq!(
- timestamp_parser("2020-01-01T12:34:56.123456",
EvalMode::Legacy).unwrap(),
+ timestamp_parser("2020-01-01T12:34:56.123456", EvalMode::Legacy,
tz).unwrap(),
Some(1577882096123456)
);
+ assert_eq!(
+ timestamp_parser("0100", EvalMode::Legacy, tz).unwrap(),
+ Some(-59011459200000000)
+ );
+ assert_eq!(
+ timestamp_parser("0100-01", EvalMode::Legacy, tz).unwrap(),
+ Some(-59011459200000000)
+ );
+ assert_eq!(
+ timestamp_parser("0100-01-01", EvalMode::Legacy, tz).unwrap(),
+ Some(-59011459200000000)
+ );
+ assert_eq!(
+ timestamp_parser("0100-01-01T12", EvalMode::Legacy, tz).unwrap(),
+ Some(-59011416000000000)
+ );
+ assert_eq!(
+ timestamp_parser("0100-01-01T12:34", EvalMode::Legacy,
tz).unwrap(),
+ Some(-59011413960000000)
+ );
+ assert_eq!(
+ timestamp_parser("0100-01-01T12:34:56", EvalMode::Legacy,
tz).unwrap(),
+ Some(-59011413904000000)
+ );
+ assert_eq!(
+ timestamp_parser("0100-01-01T12:34:56.123456", EvalMode::Legacy,
tz).unwrap(),
+ Some(-59011413903876544)
+ );
+ assert_eq!(
+ timestamp_parser("10000", EvalMode::Legacy, tz).unwrap(),
+ Some(253402300800000000)
+ );
+ assert_eq!(
+ timestamp_parser("10000-01", EvalMode::Legacy, tz).unwrap(),
+ Some(253402300800000000)
+ );
+ assert_eq!(
+ timestamp_parser("10000-01-01", EvalMode::Legacy, tz).unwrap(),
+ Some(253402300800000000)
+ );
+ assert_eq!(
+ timestamp_parser("10000-01-01T12", EvalMode::Legacy, tz).unwrap(),
+ Some(253402344000000000)
+ );
+ assert_eq!(
+ timestamp_parser("10000-01-01T12:34", EvalMode::Legacy,
tz).unwrap(),
+ Some(253402346040000000)
+ );
+ assert_eq!(
+ timestamp_parser("10000-01-01T12:34:56", EvalMode::Legacy,
tz).unwrap(),
+ Some(253402346096000000)
+ );
+ assert_eq!(
+ timestamp_parser("10000-01-01T12:34:56.123456", EvalMode::Legacy,
tz).unwrap(),
+ Some(253402346096123456)
+ );
// assert_eq!(
// timestamp_parser("T2", EvalMode::Legacy).unwrap(),
// Some(1714356000000000) // this value needs to change everyday.
@@ -1793,7 +1948,10 @@ mod tests {
let array: ArrayRef = Arc::new(StringArray::from(vec![
Some("2020-01-01T12:34:56.123456"),
Some("T2"),
+ Some("0100-01-01T12:34:56.123456"),
+ Some("10000-01-01T12:34:56.123456"),
]));
+ let tz = &timezone::Tz::from_str("UTC").unwrap();
let string_array = array
.as_any()
@@ -1805,14 +1963,15 @@ mod tests {
&string_array,
eval_mode,
TimestampMicrosecondType,
- timestamp_parser
+ timestamp_parser,
+ tz
);
assert_eq!(
result.data_type(),
&DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
);
- assert_eq!(result.len(), 2);
+ assert_eq!(result.len(), 4);
}
#[test]
diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index 4aaa52cd..833b77d5 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -663,9 +663,7 @@ class CometCastSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("cast StringType to TimestampType - subset of supported values") {
- withSQLConf(
- SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC",
- CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu") {
val values = Seq(
"2020",
"2020-01",
@@ -675,7 +673,21 @@ class CometCastSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
"2020-01-01T12:34:56",
"2020-01-01T12:34:56.123456",
"T2",
- "-9?")
+ "-9?",
+ "0100",
+ "0100-01",
+ "0100-01-01",
+ "0100-01-01T12",
+ "0100-01-01T12:34",
+ "0100-01-01T12:34:56",
+ "0100-01-01T12:34:56.123456",
+ "10000",
+ "10000-01",
+ "10000-01-01",
+ "10000-01-01T12",
+ "10000-01-01T12:34",
+ "10000-01-01T12:34:56",
+ "10000-01-01T12:34:56.123456")
castTimestampTest(values.toDF("a"), DataTypes.TimestampType)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]