This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new c206389e [AURON #1520] Implement native function of hour, minute, 
second. (#1522)
c206389e is described below

commit c206389ee0e98769aedd16995228223c5cdad13a
Author: slfan1989 <[email protected]>
AuthorDate: Wed Nov 5 10:51:02 2025 +0800

    [AURON #1520] Implement native function of hour, minute, second. (#1522)
    
    Signed-off-by: slfan1989 <[email protected]>
    
    [AURON #1520] Implement native function of hour, minute, second.
---
 Cargo.lock                                         |   2 +
 native-engine/datafusion-ext-functions/Cargo.toml  |   2 +
 native-engine/datafusion-ext-functions/src/lib.rs  |   3 +
 .../datafusion-ext-functions/src/spark_dates.rs    | 435 ++++++++++++++++++++-
 .../apache/spark/sql/auron/AuronQuerySuite.scala   | 126 ++++++
 .../apache/spark/sql/auron/NativeConverters.scala  |  23 ++
 6 files changed, 587 insertions(+), 4 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 0998d37d..9c1e4c5c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1218,6 +1218,8 @@ version = "0.1.0"
 dependencies = [
  "arrow",
  "auron-jni-bridge",
+ "chrono",
+ "chrono-tz",
  "datafusion",
  "datafusion-ext-commons",
  "itertools 0.14.0",
diff --git a/native-engine/datafusion-ext-functions/Cargo.toml 
b/native-engine/datafusion-ext-functions/Cargo.toml
index 98de05f8..8103c185 100644
--- a/native-engine/datafusion-ext-functions/Cargo.toml
+++ b/native-engine/datafusion-ext-functions/Cargo.toml
@@ -33,3 +33,5 @@ num = { workspace = true }
 paste = { workspace = true }
 serde_json = { workspace = true }
 sonic-rs = { workspace = true }
+chrono = "0.4.42"
+chrono-tz = "0.10.4"
diff --git a/native-engine/datafusion-ext-functions/src/lib.rs 
b/native-engine/datafusion-ext-functions/src/lib.rs
index 8c2f7560..96118e93 100644
--- a/native-engine/datafusion-ext-functions/src/lib.rs
+++ b/native-engine/datafusion-ext-functions/src/lib.rs
@@ -61,6 +61,9 @@ pub fn create_spark_ext_function(name: &str) -> 
Result<ScalarFunctionImplementat
         "Month" => Arc::new(spark_dates::spark_month),
         "Day" => Arc::new(spark_dates::spark_day),
         "Quarter" => Arc::new(spark_dates::spark_quarter),
+        "Hour" => Arc::new(spark_dates::spark_hour),
+        "Minute" => Arc::new(spark_dates::spark_minute),
+        "Second" => Arc::new(spark_dates::spark_second),
         "BrickhouseArrayUnion" => 
Arc::new(brickhouse::array_union::array_union),
         "Round" => Arc::new(spark_round::spark_round),
         "NormalizeNanAndZero" => {
diff --git a/native-engine/datafusion-ext-functions/src/spark_dates.rs 
b/native-engine/datafusion-ext-functions/src/spark_dates.rs
index 6221d9ab..f712f998 100644
--- a/native-engine/datafusion-ext-functions/src/spark_dates.rs
+++ b/native-engine/datafusion-ext-functions/src/spark_dates.rs
@@ -16,13 +16,21 @@
 use std::sync::Arc;
 
 use arrow::{
-    array::{ArrayRef, Int32Array},
+    array::{ArrayRef, Int32Array, TimestampMillisecondArray},
     compute::{DatePart, date_part},
-    datatypes::DataType,
+    datatypes::{DataType, TimeUnit},
+};
+use chrono::{TimeZone, Utc, prelude::*};
+use chrono_tz::Tz;
+use datafusion::{
+    common::{Result, ScalarValue},
+    physical_plan::ColumnarValue,
 };
-use datafusion::{common::Result, physical_plan::ColumnarValue};
 use datafusion_ext_commons::arrow::cast::cast;
 
+// ---- date parts on Date32 via Arrow's date_part
+// -----------------------------------------------
+
 pub fn spark_year(args: &[ColumnarValue]) -> Result<ColumnarValue> {
     let input = cast(&args[0].clone().into_array(1)?, &DataType::Date32)?;
     Ok(ColumnarValue::Array(date_part(&input, DatePart::Year)?))
@@ -65,11 +73,136 @@ pub fn spark_quarter(args: &[ColumnarValue]) -> 
Result<ColumnarValue> {
     Ok(ColumnarValue::Array(Arc::new(quarter)))
 }
 
+// ---- timezone handling (custom, Spark-like)
+// ---------------------------------------------------
+
+/// Parse optional timezone (2nd argument) into `Option<Tz>`.
+fn parse_tz(args: &[ColumnarValue]) -> Option<Tz> {
+    if args.len() < 2 {
+        return None;
+    }
+    match &args[1] {
+        ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => 
s.parse::<Tz>().ok(),
+        _ => None,
+    }
+}
+
+/// Return the UTC offset in **seconds** for `epoch_ms` at the given `tz`
+/// (DST-aware).
+fn offset_seconds_at(tz: Tz, epoch_ms: i64) -> i32 {
+    // Convert epoch_ms to UTC DateTime, then ask the tz for local offset.
+    let dt_utc = Utc.timestamp_millis_opt(epoch_ms).single();
+    match dt_utc {
+        Some(dt) => tz
+            .offset_from_utc_datetime(&dt.naive_utc())
+            .fix()
+            .local_minus_utc(),
+        None => 0, // Gracefully return 0 on invalid inputs to avoid panic.
+    }
+}
+
+/// Extract hour/minute/second from a `TimestampMillisecondArray` with optional
+/// timezone. `which`: "hour" | "minute" | "second"
+fn extract_hms_with_tz(
+    ts: &TimestampMillisecondArray,
+    tz_opt: Option<Tz>,
+    which: &str,
+) -> Int32Array {
+    const MS_PER_SEC: i64 = 1000;
+    const MS_PER_MIN: i64 = 60 * MS_PER_SEC;
+    const MS_PER_HOUR: i64 = 60 * MS_PER_MIN;
+    const MS_PER_DAY: i64 = 24 * MS_PER_HOUR;
+
+    Int32Array::from_iter(ts.iter().map(|opt_ms| {
+        opt_ms.map(|epoch_ms| {
+            // Localize by applying tz offset in seconds (if provided).
+            let local_ms = if let Some(tz) = tz_opt {
+                let off_sec = offset_seconds_at(tz, epoch_ms) as i64;
+                epoch_ms + off_sec * MS_PER_SEC
+            } else {
+                epoch_ms // Treat as UTC when tz is None.
+            };
+
+            // Milliseconds within the day with positive modulo.
+            let mut day_ms = local_ms % MS_PER_DAY;
+            if day_ms < 0 {
+                day_ms += MS_PER_DAY;
+            }
+
+            match which {
+                "hour" => (day_ms / MS_PER_HOUR) as i32,
+                "minute" => ((day_ms % MS_PER_HOUR) / MS_PER_MIN) as i32,
+                "second" => ((day_ms % MS_PER_MIN) / MS_PER_SEC) as i32,
+                _ => unreachable!("which must be one of: hour | minute | 
second"),
+            }
+        })
+    }))
+}
+
+// ---- Spark-like hour/minute/second built on custom TZ logic
+// -----------------------------------
+
+/// Extract the HOUR component. We first cast any input to
+/// `Timestamp(Millisecond, None)` (to get the physical milliseconds) and then
+/// apply our own timezone/DST logic.
+pub fn spark_hour(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    let arr_ts_ms_none = cast(
+        &args[0].clone().into_array(1)?,
+        &DataType::Timestamp(TimeUnit::Millisecond, None),
+    )?;
+
+    let ts = arr_ts_ms_none
+        .as_any()
+        .downcast_ref::<TimestampMillisecondArray>()
+        .expect("internal cast to Timestamp(Millisecond, None) must succeed");
+
+    let tz = parse_tz(args);
+    Ok(ColumnarValue::Array(Arc::new(extract_hms_with_tz(
+        ts, tz, "hour",
+    ))))
+}
+
+/// Extract the MINUTE component (same approach as `spark_hour`).
+pub fn spark_minute(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    let arr_ts_ms_none = cast(
+        &args[0].clone().into_array(1)?,
+        &DataType::Timestamp(TimeUnit::Millisecond, None),
+    )?;
+
+    let ts = arr_ts_ms_none
+        .as_any()
+        .downcast_ref::<TimestampMillisecondArray>()
+        .expect("internal cast to Timestamp(Millisecond, None) must succeed");
+
+    let tz = parse_tz(args);
+    Ok(ColumnarValue::Array(Arc::new(extract_hms_with_tz(
+        ts, tz, "minute",
+    ))))
+}
+
+/// Extract the SECOND component (same approach as `spark_hour`).
+pub fn spark_second(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    let arr_ts_ms_none = cast(
+        &args[0].clone().into_array(1)?,
+        &DataType::Timestamp(TimeUnit::Millisecond, None),
+    )?;
+
+    let ts = arr_ts_ms_none
+        .as_any()
+        .downcast_ref::<TimestampMillisecondArray>()
+        .expect("internal cast to Timestamp(Millisecond, None) must succeed");
+
+    let tz = parse_tz(args);
+    Ok(ColumnarValue::Array(Arc::new(extract_hms_with_tz(
+        ts, tz, "second",
+    ))))
+}
+
 #[cfg(test)]
 mod tests {
     use std::sync::Arc;
 
-    use arrow::array::{ArrayRef, Date32Array, Int32Array};
+    use arrow::array::{ArrayRef, Date32Array, Int32Array, 
TimestampMillisecondArray};
 
     use super::*;
 
@@ -171,4 +304,298 @@ mod tests {
         let out = spark_quarter(&args).unwrap().into_array(1).unwrap();
         assert_eq!(&out, &expected);
     }
+
+    #[inline]
+    fn arc_tz(s: &str) -> Option<Arc<str>> {
+        Some(Arc::<str>::from(s))
+    }
+
+    #[inline]
+    fn ms(h: i64, m: i64, s: i64) -> i64 {
+        (h * 3600 + m * 60 + s) * 1000
+    }
+
+    /// Build ms since epoch helper
+    fn hms_to_ms(h: i64, m: i64, s: i64) -> i64 {
+        (h * 3600 + m * 60 + s) * 1000
+    }
+
+    #[test]
+    fn test_spark_hour_minute_second_basic_from_ts() -> Result<()> {
+        // 0ms -> 1970-01-01 00:00:00 UTC
+        // 5025000ms -> 1970-01-01 01:23:45 UTC
+        let ts = Arc::new(TimestampMillisecondArray::from(vec![
+            Some(0),
+            Some(hms_to_ms(1, 23, 45)),
+            None,
+        ]));
+
+        let args = vec![ColumnarValue::Array(ts.clone())];
+
+        // hour()
+        let expected_h: ArrayRef = Arc::new(Int32Array::from(vec![Some(0), 
Some(1), None]));
+        let out_h = spark_hour(&args)?.into_array(1)?;
+        assert_eq!(&out_h, &expected_h);
+
+        // minute()
+        let expected_m: ArrayRef = Arc::new(Int32Array::from(vec![Some(0), 
Some(23), None]));
+        let out_m = spark_minute(&args)?.into_array(1)?;
+        assert_eq!(&out_m, &expected_m);
+
+        // second()
+        let expected_s: ArrayRef = Arc::new(Int32Array::from(vec![Some(0), 
Some(45), None]));
+        let out_s = spark_second(&args)?.into_array(1)?;
+        assert_eq!(&out_s, &expected_s);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_spark_timeparts_from_date32_midnight() -> Result<()> {
+        let d = Arc::new(Date32Array::from(vec![Some(0), Some(1), None]));
+        let args = vec![ColumnarValue::Array(d)];
+
+        // hour(date) -> 0
+        let expected_h: ArrayRef = Arc::new(Int32Array::from(vec![Some(0), 
Some(0), None]));
+        let out_h = spark_hour(&args)?.into_array(1)?;
+        assert_eq!(&out_h, &expected_h);
+
+        // minute(date) -> 0
+        let expected_m: ArrayRef = Arc::new(Int32Array::from(vec![Some(0), 
Some(0), None]));
+        let out_m = spark_minute(&args)?.into_array(1)?;
+        assert_eq!(&out_m, &expected_m);
+
+        // second(date) -> 0
+        let expected_s: ArrayRef = Arc::new(Int32Array::from(vec![Some(0), 
Some(0), None]));
+        let out_s = spark_second(&args)?.into_array(1)?;
+        assert_eq!(&out_s, &expected_s);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_spark_timeparts_scalar_vs_array_consistency() -> Result<()> {
+        // 1970-01-01 12:34:56 UTC
+        let ms = hms_to_ms(12, 34, 56);
+
+        // Scalar
+        let out_h_scalar = spark_hour(&[ColumnarValue::Scalar(
+            datafusion::common::ScalarValue::TimestampMillisecond(Some(ms), 
None),
+        )])?
+        .into_array(1)?;
+        let out_m_scalar = spark_minute(&[ColumnarValue::Scalar(
+            datafusion::common::ScalarValue::TimestampMillisecond(Some(ms), 
None),
+        )])?
+        .into_array(1)?;
+        let out_s_scalar = spark_second(&[ColumnarValue::Scalar(
+            datafusion::common::ScalarValue::TimestampMillisecond(Some(ms), 
None),
+        )])?
+        .into_array(1)?;
+
+        // Array
+        let arr = Arc::new(TimestampMillisecondArray::from(vec![Some(ms)]));
+        let out_h_array = 
spark_hour(&[ColumnarValue::Array(arr.clone())])?.into_array(1)?;
+        let out_m_array = 
spark_minute(&[ColumnarValue::Array(arr.clone())])?.into_array(1)?;
+        let out_s_array = 
spark_second(&[ColumnarValue::Array(arr)])?.into_array(1)?;
+
+        assert_eq!(&out_h_scalar, &out_h_array);
+        assert_eq!(&out_m_scalar, &out_m_array);
+        assert_eq!(&out_s_scalar, &out_s_array);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_spark_timeparts_pre_epoch_negative_ms() -> Result<()> {
+        let ts = Arc::new(TimestampMillisecondArray::from(vec![Some(-1000)]));
+        let args = vec![ColumnarValue::Array(ts)];
+
+        let expected_h: ArrayRef = Arc::new(Int32Array::from(vec![Some(23)]));
+        let expected_m: ArrayRef = Arc::new(Int32Array::from(vec![Some(59)]));
+        let expected_s: ArrayRef = Arc::new(Int32Array::from(vec![Some(59)]));
+
+        let out_h = spark_hour(&args)?.into_array(1)?;
+        let out_m = spark_minute(&args)?.into_array(1)?;
+        let out_s = spark_second(&args)?.into_array(1)?;
+
+        assert_eq!(&out_h, &expected_h);
+        assert_eq!(&out_m, &expected_m);
+        assert_eq!(&out_s, &expected_s);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_spark_timeparts_null_only() -> Result<()> {
+        let ts = Arc::new(TimestampMillisecondArray::from(vec![None, None]));
+        let args = vec![ColumnarValue::Array(ts)];
+
+        let expected: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
+
+        let out_h = spark_hour(&args)?.into_array(1)?;
+        let out_m = spark_minute(&args)?.into_array(1)?;
+        let out_s = spark_second(&args)?.into_array(1)?;
+
+        assert_eq!(&out_h, &expected);
+        assert_eq!(&out_m, &expected);
+        assert_eq!(&out_s, &expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_hour_utc_vs_shanghai() -> Result<()> {
+        // 1970-01-01 00:00:00 UTC
+        let ts = Arc::new(TimestampMillisecondArray::from(vec![Some(0)]));
+
+        // default (None) -> UTC
+        let out_utc = 
spark_hour(&[ColumnarValue::Array(ts.clone())])?.into_array(1)?;
+        let expected_utc: ArrayRef = Arc::new(Int32Array::from(vec![Some(0)]));
+        assert_eq!(&out_utc, &expected_utc);
+
+        let out_cst = spark_hour(&[
+            ColumnarValue::Array(ts.clone()),
+            
ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Shanghai".to_string()))),
+        ])?
+        .into_array(1)?;
+        let expected_cst: ArrayRef = Arc::new(Int32Array::from(vec![Some(8)]));
+        assert_eq!(&out_cst, &expected_cst);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_hour_scalar_vs_array_and_explicit_utc() -> Result<()> {
+        // 1970-01-01 12:34:56 UTC
+        let ts_ms = ms(12, 34, 56);
+
+        let out_scalar = 
spark_hour(&[ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
+            Some(ts_ms),
+            arc_tz("UTC"),
+        ))])?
+        .into_array(1)?;
+
+        // Array + default (None = UTC)
+        let arr = Arc::new(TimestampMillisecondArray::from(vec![Some(ts_ms)]));
+        let out_array = 
spark_hour(&[ColumnarValue::Array(arr)])?.into_array(1)?;
+
+        let expected: ArrayRef = Arc::new(Int32Array::from(vec![Some(12)]));
+        assert_eq!(&out_scalar, &expected);
+        assert_eq!(&out_array, &expected);
+        Ok(())
+    }
+
+    /// Helper: build epoch ms from a UTC calendar time.
+    fn utc_ms(y: i32, mo: u32, d: u32, h: u32, m: u32, s: u32) -> i64 {
+        // chrono 0.4: with_ymd_and_hms(...).single() for strictness
+        Utc.with_ymd_and_hms(y, mo, d, h, m, s)
+            .single()
+            .expect("valid UTC datetime")
+            .timestamp_millis()
+    }
+
+    #[test]
+    fn test_minute_second_utc_vs_shanghai_and_kolkata() -> Result<()> {
+        // epoch 0 -> 1970-01-01 00:00:00 UTC
+        let ts = Arc::new(TimestampMillisecondArray::from(vec![Some(0)]));
+
+        // Default (None) -> UTC
+        let out_min_utc = 
spark_minute(&[ColumnarValue::Array(ts.clone())])?.into_array(1)?;
+        let out_sec_utc = 
spark_second(&[ColumnarValue::Array(ts.clone())])?.into_array(1)?;
+
+        let expected_min_utc: ArrayRef = 
Arc::new(Int32Array::from(vec![Some(0)]));
+        let expected_sec_utc: ArrayRef = 
Arc::new(Int32Array::from(vec![Some(0)]));
+        assert_eq!(&out_min_utc, &expected_min_utc);
+        assert_eq!(&out_sec_utc, &expected_sec_utc);
+
+        // Asia/Shanghai (+08:00) -> 08:00:00 local => minute=0, second=0
+        let out_min_cst = spark_minute(&[
+            ColumnarValue::Array(ts.clone()),
+            
ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Shanghai".to_string()))),
+        ])?
+        .into_array(1)?;
+        let out_sec_cst = spark_second(&[
+            ColumnarValue::Array(ts.clone()),
+            
ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Shanghai".to_string()))),
+        ])?
+        .into_array(1)?;
+        let expected_min_cst: ArrayRef = 
Arc::new(Int32Array::from(vec![Some(0)]));
+        let expected_sec_cst: ArrayRef = 
Arc::new(Int32Array::from(vec![Some(0)]));
+        assert_eq!(&out_min_cst, &expected_min_cst);
+        assert_eq!(&out_sec_cst, &expected_sec_cst);
+
+        // Asia/Kolkata (+05:30) -> 05:30:00 local => minute=30, second=0
+        let out_min_kol = spark_minute(&[
+            ColumnarValue::Array(ts.clone()),
+            
ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Kolkata".to_string()))),
+        ])?
+        .into_array(1)?;
+        let out_sec_kol = spark_second(&[
+            ColumnarValue::Array(ts),
+            
ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Kolkata".to_string()))),
+        ])?
+        .into_array(1)?;
+        let expected_min_kol: ArrayRef = 
Arc::new(Int32Array::from(vec![Some(30)]));
+        let expected_sec_kol: ArrayRef = 
Arc::new(Int32Array::from(vec![Some(0)]));
+        assert_eq!(&out_min_kol, &expected_min_kol);
+        assert_eq!(&out_sec_kol, &expected_sec_kol);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_minute_second_nonwhole_offset_kathmandu() -> Result<()> {
+        // 1970-01-01 00:00:00 UTC -> Asia/Kathmandu was +05:30
+        let ts = Arc::new(TimestampMillisecondArray::from(vec![Some(0)]));
+
+        let out_min = spark_minute(&[
+            ColumnarValue::Array(ts.clone()),
+            
ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Kathmandu".to_string()))),
+        ])?
+        .into_array(1)?;
+        let out_sec = spark_second(&[
+            ColumnarValue::Array(ts),
+            
ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Kathmandu".to_string()))),
+        ])?
+        .into_array(1)?;
+
+        let expected_min: ArrayRef = 
Arc::new(Int32Array::from(vec![Some(30)]));
+        let expected_sec: ArrayRef = Arc::new(Int32Array::from(vec![Some(0)]));
+        assert_eq!(&out_min, &expected_min);
+        assert_eq!(&out_sec, &expected_sec);
+        Ok(())
+    }
+
+    #[test]
+    fn test_minute_second_dst_spring_forward_newyork() -> Result<()> {
+        // America/New_York DST starts on 2019-03-10.
+        // Local time jumps from 01:59:59 to 03:00:00 (02:00:00 - 02:59:59 
does not
+        // exist).
+
+        // 2019-03-10 06:59:59 UTC -> 01:59:59 local (EST, UTC-5)
+        let t1 = utc_ms(2019, 3, 10, 6, 59, 59);
+        // 2019-03-10 07:00:00 UTC -> 03:00:00 local (EDT, UTC-4)
+        let t2 = utc_ms(2019, 3, 10, 7, 0, 0);
+
+        let ts = Arc::new(TimestampMillisecondArray::from(vec![Some(t1), 
Some(t2)]));
+
+        let out_min = spark_minute(&[
+            ColumnarValue::Array(ts.clone()),
+            
ColumnarValue::Scalar(ScalarValue::Utf8(Some("America/New_York".to_string()))),
+        ])?
+        .into_array(2)?;
+        let out_sec = spark_second(&[
+            ColumnarValue::Array(ts.clone()),
+            
ColumnarValue::Scalar(ScalarValue::Utf8(Some("America/New_York".to_string()))),
+        ])?
+        .into_array(2)?;
+
+        let expected_min: ArrayRef = Arc::new(Int32Array::from(vec![Some(59), 
Some(0)]));
+        let expected_sec: ArrayRef = Arc::new(Int32Array::from(vec![Some(59), 
Some(0)]));
+
+        assert_eq!(&out_min, &expected_min);
+        assert_eq!(&out_sec, &expected_sec);
+
+        Ok(())
+    }
 }
diff --git 
a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronQuerySuite.scala
 
b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronQuerySuite.scala
index d36fd096..1d368493 100644
--- 
a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronQuerySuite.scala
+++ 
b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronQuerySuite.scala
@@ -374,4 +374,130 @@ class AuronQuerySuite
         checkAnswer(sql(q), Seq(expected))
     }
   }
+
+  test("test filter with hour function") {
+    withEnvConf("spark.auron.datetime.extract.enabled" -> "true") {
+      withTable("t_hour") {
+        sql("""
+            |create table t_hour using parquet as
+            |select to_timestamp('2024-12-18 01:23:45') as event_time union all
+            |select to_timestamp('2024-12-18 08:00:00') union all
+            |select to_timestamp('2024-12-18 08:59:59')
+            |""".stripMargin)
+
+        // Keep rows where HOUR >= 8, then group by hour
+        checkAnswer(
+          sql("""
+              |select h, count(*)
+              |from (select hour(event_time) as h from t_hour) t
+              |where h >= 8
+              |group by h
+              |order by h
+              |""".stripMargin),
+          Seq(Row(8, 2)))
+      }
+    }
+  }
+
+  test("test filter with minute function") {
+    withEnvConf("spark.auron.datetime.extract.enabled" -> "true") {
+      withTable("t_minute") {
+        sql("""
+            |create table t_minute using parquet as
+            |select to_timestamp('2024-12-18 00:00:00') as event_time union all
+            |select to_timestamp('2024-12-18 00:30:00') union all
+            |select to_timestamp('2024-12-18 12:30:59')
+            |""".stripMargin)
+
+        // Keep rows where MINUTE = 30, then group by minute
+        checkAnswer(
+          sql("""
+              |select m, count(*)
+              |from (select minute(event_time) as m from t_minute) t
+              |where m = 30
+              |group by m
+              |""".stripMargin),
+          Seq(Row(30, 2)))
+      }
+    }
+  }
+
+  test("test filter with second function") {
+    withEnvConf("spark.auron.datetime.extract.enabled" -> "true") {
+      withTable("t_second") {
+        sql("""
+            |create table t_second using parquet as
+            |select to_timestamp('2024-12-18 00:00:00') as event_time union all
+            |select to_timestamp('2024-12-18 01:23:00') union all
+            |select to_timestamp('2024-12-18 23:59:45')
+            |""".stripMargin)
+
+        // Keep rows where SECOND = 0, then group by second
+        checkAnswer(
+          sql("""
+              |select s, count(*)
+              |from (select second(event_time) as s from t_second) t
+              |where s = 0
+              |group by s
+              |""".stripMargin),
+          Seq(Row(0, 2)))
+      }
+    }
+  }
+
+  // For Date input: hour/minute/second should all be 0
+  test("timeparts on Date input return zeros") {
+    withEnvConf("spark.auron.datetime.extract.enabled" -> "true") {
+      withTable("t_date_parts") {
+        sql(
+          "create table t_date_parts using parquet as select date'2024-12-18' 
as d union all select date'2024-12-19'")
+        checkAnswer(
+          sql("""
+              |select
+              |  hour(d)   as h,
+              |  minute(d) as m,
+              |  second(d) as s
+              |from t_date_parts
+              |order by d
+              |""".stripMargin),
+          Seq(Row(0, 0, 0), Row(0, 0, 0)))
+      }
+    }
+  }
+
+  test("hour/minute/second respect timezone via from_utc_timestamp") {
+    withEnvConf("spark.auron.datetime.extract.enabled" -> "true") {
+      withTable("t_tz") {
+        // Construct: UTC 1970-01-01 00:00:00 → Asia/Shanghai => local 08:00:00
+        sql("""
+            |create table t_tz using parquet as
+            |select from_utc_timestamp(to_timestamp('1970-01-01 00:00:00'), 
'Asia/Shanghai') as ts
+            |""".stripMargin)
+
+        checkAnswer(
+          sql("""
+              |select hour(ts), minute(ts), second(ts)
+              |from t_tz
+              |""".stripMargin),
+          Seq(Row(8, 0, 0)))
+      }
+    }
+  }
+
+  test("minute/second with non-whole-hour offsets") {
+    withEnvConf("spark.auron.datetime.extract.enabled" -> "true") {
+      withTable("t_tz2") {
+        sql("""
+            |create table t_tz2 using parquet as
+            |select from_utc_timestamp(to_timestamp('2000-01-01 00:00:00'), 
'Asia/Kolkata')   as ts1,  -- +05:30
+            |       from_utc_timestamp(to_timestamp('2000-01-01 00:00:00'), 
'Asia/Kathmandu') as ts2   -- +05:45
+            |""".stripMargin)
+
+        // Kolkata -> 05:30:00; Kathmandu -> 05:45:00
+        checkAnswer(
+          sql("select minute(ts1), second(ts1), minute(ts2), second(ts2) from 
t_tz2"),
+          Seq(Row(30, 0, 45, 0)))
+      }
+    }
+  }
 }
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
index 1f9c82a9..e8447434 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
@@ -97,6 +97,8 @@ object NativeConverters extends Logging {
     AuronConverters.getBooleanConf("spark.auron.udf.brickhouse.enabled", 
defaultValue = true)
   def decimalArithOpEnabled: Boolean =
     AuronConverters.getBooleanConf("spark.auron.decimal.arithOp.enabled", 
defaultValue = false)
+  def datetimeExtractEnabled: Boolean =
+    AuronConverters.getBooleanConf("spark.auron.datetime.extract.enabled", 
defaultValue = false)
 
   def scalarTypeSupported(dataType: DataType): Boolean = {
     dataType match {
@@ -884,6 +886,13 @@ object NativeConverters extends Logging {
       case e: Levenshtein =>
         buildScalarFunction(pb.ScalarFunction.Levenshtein, e.children, 
e.dataType)
 
+      case e: Hour if datetimeExtractEnabled =>
+        buildTimePartExt("Hour", e.children.head, isPruningExpr, fallback)
+      case e: Minute if datetimeExtractEnabled =>
+        buildTimePartExt("Minute", e.children.head, isPruningExpr, fallback)
+      case e: Second if datetimeExtractEnabled =>
+        buildTimePartExt("Second", e.children.head, isPruningExpr, fallback)
+
       // startswith is converted to scalar function in pruning-expr mode
       case StartsWith(expr, Literal(prefix, StringType)) if isPruningExpr =>
         buildExprNode(
@@ -1290,6 +1299,20 @@ object NativeConverters extends Logging {
           .setReturnType(convertDataType(dataType)))
     }
 
+  private def buildTimePartExt(
+      name: String,
+      child: Expression,
+      isPruningExpr: Boolean,
+      fallback: Expression => pb.PhysicalExprNode): pb.PhysicalExprNode = {
+    val tzArg: Expression = child.dataType match {
+      case TimestampType =>
+        Literal(SQLConf.get.sessionLocalTimeZone, StringType)
+      case _ =>
+        Literal.create(null, StringType)
+    }
+    buildExtScalarFunctionNode(name, Seq(child, tzArg), IntegerType, 
isPruningExpr, fallback)
+  }
+
   def castIfNecessary(expr: Expression, dataType: DataType): Expression = {
     if (expr.dataType == dataType) {
       return expr

Reply via email to