andygrove commented on code in PR #4753:
URL: https://github.com/apache/datafusion-comet/pull/4753#discussion_r3502117034
##########
spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala:
##########
@@ -1032,17 +1032,20 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("native opt-in hint shown for codegen-dispatch path") {
withTempDir { dir =>
- // _5 column is TIMESTAMP(MICROS,false) = TimestampNTZ, which triggers
Incompatible in CometHour
+ // _4 column is TIMESTAMP(MICROS,true) = TimestampType. FromUTCTimestamp
is Incompatible
+ // (its native timezone parser is stricter than Spark's), so with
allowIncompatible=false it
+ // rides the codegen dispatcher and surfaces the native opt-in hint.
val path = new Path(dir.toURI.toString, "hint_test.parquet")
makeRawTimeParquetFile(path, dictionaryEnabled = false, n = 5)
- withSQLConf("spark.comet.expression.Hour.allowIncompatible" -> "false") {
- val df = spark.read.parquet(path.toString).selectExpr("hour(_5) as h")
+ withSQLConf("spark.comet.expression.FromUTCTimestamp.allowIncompatible"
-> "false") {
Review Comment:
Yes, the `false` is intentional. With `allowIncompatible=false`, the
Incompatible `FromUTCTimestamp` can't run on the native path, so it rides the
codegen dispatcher and surfaces the native opt-in hint, which is exactly what
this test asserts. The comment just above the `withSQLConf` block explains the
setup.
##########
native/spark-expr/src/datetime_funcs/extract_date_part.rs:
##########
@@ -98,3 +116,128 @@ macro_rules! extract_date_part {
extract_date_part!(SparkHour, "hour", Hour);
extract_date_part!(SparkMinute, "minute", Minute);
extract_date_part!(SparkSecond, "second", Second);
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::array::{ArrayRef, Int32Array, TimestampMicrosecondArray};
+ use arrow::datatypes::{Field, Int8Type, TimeUnit};
+ use datafusion::config::ConfigOptions;
+ use std::sync::Arc;
+
+ // 2024-01-15 18:30:45 UTC, in microseconds since the epoch.
+ const MICROS: i64 = 1_705_343_445_000_000;
+
+ /// Invokes a single-argument date-part UDF on `array` and returns the
first extracted value.
+ fn invoke<U: ScalarUDFImpl>(udf: &U, array: ArrayRef) -> i32 {
+ let return_field = Arc::new(Field::new("v", DataType::Int32, true));
+ let args = ScalarFunctionArgs {
+ args: vec![ColumnarValue::Array(array)],
+ number_rows: 1,
+ return_field,
+ config_options: Arc::new(ConfigOptions::default()),
+ arg_fields: vec![],
+ };
+ match udf.invoke_with_args(args).unwrap() {
+ ColumnarValue::Array(arr) => {
+ arr.as_any().downcast_ref::<Int32Array>().unwrap().value(0)
+ }
+ _ => panic!("Expected array"),
+ }
+ }
+
+ fn ntz_array() -> ArrayRef {
+ Arc::new(TimestampMicrosecondArray::from(vec![Some(MICROS)]))
+ }
+
+ #[test]
+ fn timestamp_with_timezone_converts_to_session_timezone() {
+ // A timezone-aware timestamp stores the absolute UTC instant. In a
Los Angeles session
+ // (UTC-8 in January) the local hour is 18 - 8 = 10.
+ let udf = SparkHour::new("America/Los_Angeles".to_string());
+ let array =
TimestampMicrosecondArray::from(vec![Some(MICROS)]).with_timezone("UTC");
+ assert_eq!(invoke(&udf, Arc::new(array)), 10);
+ }
+
+ #[test]
+ fn timestamp_ntz_extracts_local_time_without_conversion() {
+ // TimestampNTZ stores local wall-clock time, so the hour is extracted
directly (18) with
+ // no session timezone offset applied (issue #3180).
+ let udf = SparkHour::new("America/Los_Angeles".to_string());
+ assert_eq!(invoke(&udf, ntz_array()), 18);
+ }
+
+ #[test]
+ fn timestamp_ntz_result_is_independent_of_session_timezone() {
+ // The extracted hour must be the same regardless of the session
timezone.
+ for tz in ["UTC", "America/Los_Angeles", "Asia/Tokyo"] {
+ let udf = SparkHour::new(tz.to_string());
+ assert_eq!(invoke(&udf, ntz_array()), 18);
+ }
+ }
+
+ #[test]
+ fn minute_and_second_on_timestamp_ntz() {
+ assert_eq!(
+ invoke(&SparkMinute::new("Asia/Tokyo".to_string()), ntz_array()),
+ 30
+ );
+ assert_eq!(
+ invoke(&SparkSecond::new("Asia/Tokyo".to_string()), ntz_array()),
+ 45
+ );
+ }
+
+ #[test]
+ fn is_timestamp_ntz_detects_plain_and_dictionary_wrapped() {
+ assert!(is_timestamp_ntz(&DataType::Timestamp(
+ TimeUnit::Microsecond,
+ None
+ )));
+ assert!(is_timestamp_ntz(&DataType::Dictionary(
+ Box::new(DataType::Int8),
+ Box::new(DataType::Timestamp(TimeUnit::Microsecond, None)),
+ )));
+ // Timezone-aware timestamps and dictionaries of them are not NTZ.
+ assert!(!is_timestamp_ntz(&DataType::Timestamp(
+ TimeUnit::Microsecond,
+ Some("UTC".into()),
+ )));
+ assert!(!is_timestamp_ntz(&DataType::Dictionary(
+ Box::new(DataType::Int8),
+ Box::new(DataType::Timestamp(
+ TimeUnit::Microsecond,
+ Some("UTC".into())
+ )),
+ )));
+ }
+
+ #[test]
+ fn timestamp_ntz_dictionary_input_extracts_local_time() {
+ use arrow::array::{DictionaryArray, Int8Array};
+ let values =
Arc::new(TimestampMicrosecondArray::from(vec![Some(MICROS)])) as ArrayRef;
+ let keys = Int8Array::from(vec![0i8]);
Review Comment:
Good catch, this matches the real production shape now. I switched the test
to an `Int32`-keyed dictionary (Comet only emits `Int32` keys) so the input
lines up with the declared `Dictionary(Int32, Int32)` return type, and it now
asserts the result is still a `Dictionary(Int32, Int32)` and reads the value
through the dictionary rather than casting it away. Done in 9848009.
##########
native/spark-expr/src/datetime_funcs/extract_date_part.rs:
##########
@@ -24,6 +24,17 @@ use datafusion::logical_expr::{
};
use std::fmt::Debug;
+/// Returns true when the type is a timestamp without a timezone (Spark's
TimestampNTZType),
+/// including when wrapped in a dictionary. Such values store local wall-clock
time and must not
+/// have any session timezone offset applied when extracting date parts.
+fn is_timestamp_ntz(data_type: &DataType) -> bool {
Review Comment:
Agreed, good forward pointer. If #2649 (`date_trunc` NTZ handling) gets
picked up, this `is_timestamp_ntz` predicate should carry over directly rather
than being re-derived. Leaving it here for now since it's the only caller, but
worth promoting to a shared helper when the second use case lands.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]