andygrove commented on code in PR #4753:
URL: https://github.com/apache/datafusion-comet/pull/4753#discussion_r3502117034


##########
spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala:
##########
@@ -1032,17 +1032,20 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
 
   test("native opt-in hint shown for codegen-dispatch path") {
     withTempDir { dir =>
-      // _5 column is TIMESTAMP(MICROS,false) = TimestampNTZ, which triggers 
Incompatible in CometHour
+      // _4 column is TIMESTAMP(MICROS,true) = TimestampType. FromUTCTimestamp 
is Incompatible
+      // (its native timezone parser is stricter than Spark's), so with 
allowIncompatible=false it
+      // rides the codegen dispatcher and surfaces the native opt-in hint.
       val path = new Path(dir.toURI.toString, "hint_test.parquet")
       makeRawTimeParquetFile(path, dictionaryEnabled = false, n = 5)
-      withSQLConf("spark.comet.expression.Hour.allowIncompatible" -> "false") {
-        val df = spark.read.parquet(path.toString).selectExpr("hour(_5) as h")
+      withSQLConf("spark.comet.expression.FromUTCTimestamp.allowIncompatible" 
-> "false") {

Review Comment:
   Yes, the `false` is intentional. With `allowIncompatible=false`, the 
Incompatible `FromUTCTimestamp` can't run on the native path, so it rides the 
codegen dispatcher and surfaces the native opt-in hint, which is exactly what 
this test asserts. The comment just above the `withSQLConf` block explains the 
setup.



##########
native/spark-expr/src/datetime_funcs/extract_date_part.rs:
##########
@@ -98,3 +116,128 @@ macro_rules! extract_date_part {
 extract_date_part!(SparkHour, "hour", Hour);
 extract_date_part!(SparkMinute, "minute", Minute);
 extract_date_part!(SparkSecond, "second", Second);
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{ArrayRef, Int32Array, TimestampMicrosecondArray};
+    use arrow::datatypes::{Field, Int8Type, TimeUnit};
+    use datafusion::config::ConfigOptions;
+    use std::sync::Arc;
+
+    // 2024-01-15 18:30:45 UTC, in microseconds since the epoch.
+    const MICROS: i64 = 1_705_343_445_000_000;
+
+    /// Invokes a single-argument date-part UDF on `array` and returns the 
first extracted value.
+    fn invoke<U: ScalarUDFImpl>(udf: &U, array: ArrayRef) -> i32 {
+        let return_field = Arc::new(Field::new("v", DataType::Int32, true));
+        let args = ScalarFunctionArgs {
+            args: vec![ColumnarValue::Array(array)],
+            number_rows: 1,
+            return_field,
+            config_options: Arc::new(ConfigOptions::default()),
+            arg_fields: vec![],
+        };
+        match udf.invoke_with_args(args).unwrap() {
+            ColumnarValue::Array(arr) => {
+                arr.as_any().downcast_ref::<Int32Array>().unwrap().value(0)
+            }
+            _ => panic!("Expected array"),
+        }
+    }
+
+    fn ntz_array() -> ArrayRef {
+        Arc::new(TimestampMicrosecondArray::from(vec![Some(MICROS)]))
+    }
+
+    #[test]
+    fn timestamp_with_timezone_converts_to_session_timezone() {
+        // A timezone-aware timestamp stores the absolute UTC instant. In a 
Los Angeles session
+        // (UTC-8 in January) the local hour is 18 - 8 = 10.
+        let udf = SparkHour::new("America/Los_Angeles".to_string());
+        let array = 
TimestampMicrosecondArray::from(vec![Some(MICROS)]).with_timezone("UTC");
+        assert_eq!(invoke(&udf, Arc::new(array)), 10);
+    }
+
+    #[test]
+    fn timestamp_ntz_extracts_local_time_without_conversion() {
+        // TimestampNTZ stores local wall-clock time, so the hour is extracted 
directly (18) with
+        // no session timezone offset applied (issue #3180).
+        let udf = SparkHour::new("America/Los_Angeles".to_string());
+        assert_eq!(invoke(&udf, ntz_array()), 18);
+    }
+
+    #[test]
+    fn timestamp_ntz_result_is_independent_of_session_timezone() {
+        // The extracted hour must be the same regardless of the session 
timezone.
+        for tz in ["UTC", "America/Los_Angeles", "Asia/Tokyo"] {
+            let udf = SparkHour::new(tz.to_string());
+            assert_eq!(invoke(&udf, ntz_array()), 18);
+        }
+    }
+
+    #[test]
+    fn minute_and_second_on_timestamp_ntz() {
+        assert_eq!(
+            invoke(&SparkMinute::new("Asia/Tokyo".to_string()), ntz_array()),
+            30
+        );
+        assert_eq!(
+            invoke(&SparkSecond::new("Asia/Tokyo".to_string()), ntz_array()),
+            45
+        );
+    }
+
+    #[test]
+    fn is_timestamp_ntz_detects_plain_and_dictionary_wrapped() {
+        assert!(is_timestamp_ntz(&DataType::Timestamp(
+            TimeUnit::Microsecond,
+            None
+        )));
+        assert!(is_timestamp_ntz(&DataType::Dictionary(
+            Box::new(DataType::Int8),
+            Box::new(DataType::Timestamp(TimeUnit::Microsecond, None)),
+        )));
+        // Timezone-aware timestamps and dictionaries of them are not NTZ.
+        assert!(!is_timestamp_ntz(&DataType::Timestamp(
+            TimeUnit::Microsecond,
+            Some("UTC".into()),
+        )));
+        assert!(!is_timestamp_ntz(&DataType::Dictionary(
+            Box::new(DataType::Int8),
+            Box::new(DataType::Timestamp(
+                TimeUnit::Microsecond,
+                Some("UTC".into())
+            )),
+        )));
+    }
+
+    #[test]
+    fn timestamp_ntz_dictionary_input_extracts_local_time() {
+        use arrow::array::{DictionaryArray, Int8Array};
+        let values = 
Arc::new(TimestampMicrosecondArray::from(vec![Some(MICROS)])) as ArrayRef;
+        let keys = Int8Array::from(vec![0i8]);

Review Comment:
   Good catch, this matches the real production shape now. I switched the test 
to an `Int32`-keyed dictionary (Comet only emits `Int32` keys) so the input 
lines up with the declared `Dictionary(Int32, Int32)` return type, and it now 
asserts the result is still a `Dictionary(Int32, Int32)` and reads the value 
through the dictionary rather than casting it away. Done in 9848009.



##########
native/spark-expr/src/datetime_funcs/extract_date_part.rs:
##########
@@ -24,6 +24,17 @@ use datafusion::logical_expr::{
 };
 use std::fmt::Debug;
 
+/// Returns true when the type is a timestamp without a timezone (Spark's 
TimestampNTZType),
+/// including when wrapped in a dictionary. Such values store local wall-clock 
time and must not
+/// have any session timezone offset applied when extracting date parts.
+fn is_timestamp_ntz(data_type: &DataType) -> bool {

Review Comment:
   Agreed, good forward pointer. If #2649 (`date_trunc` NTZ handling) gets 
picked up, this `is_timestamp_ntz` predicate should carry over directly rather 
than being re-derived. Leaving it here for now since it's the only caller, but 
worth promoting to a shared helper when the second use case lands.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to