This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new c206389e [AURON #1520] Implement native function of hour, minute,
second. (#1522)
c206389e is described below
commit c206389ee0e98769aedd16995228223c5cdad13a
Author: slfan1989 <[email protected]>
AuthorDate: Wed Nov 5 10:51:02 2025 +0800
[AURON #1520] Implement native function of hour, minute, second. (#1522)
Signed-off-by: slfan1989 <[email protected]>
[AURON #1520] Implement native function of hour, minute, second.
---
Cargo.lock | 2 +
native-engine/datafusion-ext-functions/Cargo.toml | 2 +
native-engine/datafusion-ext-functions/src/lib.rs | 3 +
.../datafusion-ext-functions/src/spark_dates.rs | 435 ++++++++++++++++++++-
.../apache/spark/sql/auron/AuronQuerySuite.scala | 126 ++++++
.../apache/spark/sql/auron/NativeConverters.scala | 23 ++
6 files changed, 587 insertions(+), 4 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 0998d37d..9c1e4c5c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1218,6 +1218,8 @@ version = "0.1.0"
dependencies = [
"arrow",
"auron-jni-bridge",
+ "chrono",
+ "chrono-tz",
"datafusion",
"datafusion-ext-commons",
"itertools 0.14.0",
diff --git a/native-engine/datafusion-ext-functions/Cargo.toml
b/native-engine/datafusion-ext-functions/Cargo.toml
index 98de05f8..8103c185 100644
--- a/native-engine/datafusion-ext-functions/Cargo.toml
+++ b/native-engine/datafusion-ext-functions/Cargo.toml
@@ -33,3 +33,5 @@ num = { workspace = true }
paste = { workspace = true }
serde_json = { workspace = true }
sonic-rs = { workspace = true }
+chrono = "0.4.42"
+chrono-tz = "0.10.4"
diff --git a/native-engine/datafusion-ext-functions/src/lib.rs
b/native-engine/datafusion-ext-functions/src/lib.rs
index 8c2f7560..96118e93 100644
--- a/native-engine/datafusion-ext-functions/src/lib.rs
+++ b/native-engine/datafusion-ext-functions/src/lib.rs
@@ -61,6 +61,9 @@ pub fn create_spark_ext_function(name: &str) ->
Result<ScalarFunctionImplementat
"Month" => Arc::new(spark_dates::spark_month),
"Day" => Arc::new(spark_dates::spark_day),
"Quarter" => Arc::new(spark_dates::spark_quarter),
+ "Hour" => Arc::new(spark_dates::spark_hour),
+ "Minute" => Arc::new(spark_dates::spark_minute),
+ "Second" => Arc::new(spark_dates::spark_second),
"BrickhouseArrayUnion" =>
Arc::new(brickhouse::array_union::array_union),
"Round" => Arc::new(spark_round::spark_round),
"NormalizeNanAndZero" => {
diff --git a/native-engine/datafusion-ext-functions/src/spark_dates.rs
b/native-engine/datafusion-ext-functions/src/spark_dates.rs
index 6221d9ab..f712f998 100644
--- a/native-engine/datafusion-ext-functions/src/spark_dates.rs
+++ b/native-engine/datafusion-ext-functions/src/spark_dates.rs
@@ -16,13 +16,21 @@
use std::sync::Arc;
use arrow::{
- array::{ArrayRef, Int32Array},
+ array::{ArrayRef, Int32Array, TimestampMillisecondArray},
compute::{DatePart, date_part},
- datatypes::DataType,
+ datatypes::{DataType, TimeUnit},
+};
+use chrono::{TimeZone, Utc, prelude::*};
+use chrono_tz::Tz;
+use datafusion::{
+ common::{Result, ScalarValue},
+ physical_plan::ColumnarValue,
};
-use datafusion::{common::Result, physical_plan::ColumnarValue};
use datafusion_ext_commons::arrow::cast::cast;
+// ---- date parts on Date32 via Arrow's date_part
+// -----------------------------------------------
+
pub fn spark_year(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let input = cast(&args[0].clone().into_array(1)?, &DataType::Date32)?;
Ok(ColumnarValue::Array(date_part(&input, DatePart::Year)?))
@@ -65,11 +73,136 @@ pub fn spark_quarter(args: &[ColumnarValue]) ->
Result<ColumnarValue> {
Ok(ColumnarValue::Array(Arc::new(quarter)))
}
+// ---- timezone handling (custom, Spark-like)
+// ---------------------------------------------------
+
+/// Parse optional timezone (2nd argument) into `Option<Tz>`.
+fn parse_tz(args: &[ColumnarValue]) -> Option<Tz> {
+ if args.len() < 2 {
+ return None;
+ }
+ match &args[1] {
+ ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) =>
s.parse::<Tz>().ok(),
+ _ => None,
+ }
+}
+
+/// Return the UTC offset in **seconds** for `epoch_ms` at the given `tz`
+/// (DST-aware).
+fn offset_seconds_at(tz: Tz, epoch_ms: i64) -> i32 {
+ // Convert epoch_ms to UTC DateTime, then ask the tz for local offset.
+ let dt_utc = Utc.timestamp_millis_opt(epoch_ms).single();
+ match dt_utc {
+ Some(dt) => tz
+ .offset_from_utc_datetime(&dt.naive_utc())
+ .fix()
+ .local_minus_utc(),
+ None => 0, // Gracefully return 0 on invalid inputs to avoid panic.
+ }
+}
+
+/// Extract hour/minute/second from a `TimestampMillisecondArray` with optional
+/// timezone. `which`: "hour" | "minute" | "second"
+fn extract_hms_with_tz(
+ ts: &TimestampMillisecondArray,
+ tz_opt: Option<Tz>,
+ which: &str,
+) -> Int32Array {
+ const MS_PER_SEC: i64 = 1000;
+ const MS_PER_MIN: i64 = 60 * MS_PER_SEC;
+ const MS_PER_HOUR: i64 = 60 * MS_PER_MIN;
+ const MS_PER_DAY: i64 = 24 * MS_PER_HOUR;
+
+ Int32Array::from_iter(ts.iter().map(|opt_ms| {
+ opt_ms.map(|epoch_ms| {
+ // Localize by applying tz offset in seconds (if provided).
+ let local_ms = if let Some(tz) = tz_opt {
+ let off_sec = offset_seconds_at(tz, epoch_ms) as i64;
+ epoch_ms + off_sec * MS_PER_SEC
+ } else {
+ epoch_ms // Treat as UTC when tz is None.
+ };
+
+ // Milliseconds within the day with positive modulo.
+ let mut day_ms = local_ms % MS_PER_DAY;
+ if day_ms < 0 {
+ day_ms += MS_PER_DAY;
+ }
+
+ match which {
+ "hour" => (day_ms / MS_PER_HOUR) as i32,
+ "minute" => ((day_ms % MS_PER_HOUR) / MS_PER_MIN) as i32,
+ "second" => ((day_ms % MS_PER_MIN) / MS_PER_SEC) as i32,
+ _ => unreachable!("which must be one of: hour | minute |
second"),
+ }
+ })
+ }))
+}
+
+// ---- Spark-like hour/minute/second built on custom TZ logic
+// -----------------------------------
+
+/// Extract the HOUR component. We first cast any input to
+/// `Timestamp(Millisecond, None)` (to get the physical milliseconds) and then
+/// apply our own timezone/DST logic.
+pub fn spark_hour(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ let arr_ts_ms_none = cast(
+ &args[0].clone().into_array(1)?,
+ &DataType::Timestamp(TimeUnit::Millisecond, None),
+ )?;
+
+ let ts = arr_ts_ms_none
+ .as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .expect("internal cast to Timestamp(Millisecond, None) must succeed");
+
+ let tz = parse_tz(args);
+ Ok(ColumnarValue::Array(Arc::new(extract_hms_with_tz(
+ ts, tz, "hour",
+ ))))
+}
+
+/// Extract the MINUTE component (same approach as `spark_hour`).
+pub fn spark_minute(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ let arr_ts_ms_none = cast(
+ &args[0].clone().into_array(1)?,
+ &DataType::Timestamp(TimeUnit::Millisecond, None),
+ )?;
+
+ let ts = arr_ts_ms_none
+ .as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .expect("internal cast to Timestamp(Millisecond, None) must succeed");
+
+ let tz = parse_tz(args);
+ Ok(ColumnarValue::Array(Arc::new(extract_hms_with_tz(
+ ts, tz, "minute",
+ ))))
+}
+
+/// Extract the SECOND component (same approach as `spark_hour`).
+pub fn spark_second(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ let arr_ts_ms_none = cast(
+ &args[0].clone().into_array(1)?,
+ &DataType::Timestamp(TimeUnit::Millisecond, None),
+ )?;
+
+ let ts = arr_ts_ms_none
+ .as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .expect("internal cast to Timestamp(Millisecond, None) must succeed");
+
+ let tz = parse_tz(args);
+ Ok(ColumnarValue::Array(Arc::new(extract_hms_with_tz(
+ ts, tz, "second",
+ ))))
+}
+
#[cfg(test)]
mod tests {
use std::sync::Arc;
- use arrow::array::{ArrayRef, Date32Array, Int32Array};
+ use arrow::array::{ArrayRef, Date32Array, Int32Array,
TimestampMillisecondArray};
use super::*;
@@ -171,4 +304,298 @@ mod tests {
let out = spark_quarter(&args).unwrap().into_array(1).unwrap();
assert_eq!(&out, &expected);
}
+
+ #[inline]
+ fn arc_tz(s: &str) -> Option<Arc<str>> {
+ Some(Arc::<str>::from(s))
+ }
+
+ #[inline]
+ fn ms(h: i64, m: i64, s: i64) -> i64 {
+ (h * 3600 + m * 60 + s) * 1000
+ }
+
+ /// Build ms since epoch helper
+ fn hms_to_ms(h: i64, m: i64, s: i64) -> i64 {
+ (h * 3600 + m * 60 + s) * 1000
+ }
+
+ #[test]
+ fn test_spark_hour_minute_second_basic_from_ts() -> Result<()> {
+ // 0ms -> 1970-01-01 00:00:00 UTC
+ // 5025000ms -> 1970-01-01 01:23:45 UTC
+ let ts = Arc::new(TimestampMillisecondArray::from(vec![
+ Some(0),
+ Some(hms_to_ms(1, 23, 45)),
+ None,
+ ]));
+
+ let args = vec![ColumnarValue::Array(ts.clone())];
+
+ // hour()
+ let expected_h: ArrayRef = Arc::new(Int32Array::from(vec![Some(0),
Some(1), None]));
+ let out_h = spark_hour(&args)?.into_array(1)?;
+ assert_eq!(&out_h, &expected_h);
+
+ // minute()
+ let expected_m: ArrayRef = Arc::new(Int32Array::from(vec![Some(0),
Some(23), None]));
+ let out_m = spark_minute(&args)?.into_array(1)?;
+ assert_eq!(&out_m, &expected_m);
+
+ // second()
+ let expected_s: ArrayRef = Arc::new(Int32Array::from(vec![Some(0),
Some(45), None]));
+ let out_s = spark_second(&args)?.into_array(1)?;
+ assert_eq!(&out_s, &expected_s);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_spark_timeparts_from_date32_midnight() -> Result<()> {
+ let d = Arc::new(Date32Array::from(vec![Some(0), Some(1), None]));
+ let args = vec![ColumnarValue::Array(d)];
+
+ // hour(date) -> 0
+ let expected_h: ArrayRef = Arc::new(Int32Array::from(vec![Some(0),
Some(0), None]));
+ let out_h = spark_hour(&args)?.into_array(1)?;
+ assert_eq!(&out_h, &expected_h);
+
+ // minute(date) -> 0
+ let expected_m: ArrayRef = Arc::new(Int32Array::from(vec![Some(0),
Some(0), None]));
+ let out_m = spark_minute(&args)?.into_array(1)?;
+ assert_eq!(&out_m, &expected_m);
+
+ // second(date) -> 0
+ let expected_s: ArrayRef = Arc::new(Int32Array::from(vec![Some(0),
Some(0), None]));
+ let out_s = spark_second(&args)?.into_array(1)?;
+ assert_eq!(&out_s, &expected_s);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_spark_timeparts_scalar_vs_array_consistency() -> Result<()> {
+ // 1970-01-01 12:34:56 UTC
+ let ms = hms_to_ms(12, 34, 56);
+
+ // Scalar
+ let out_h_scalar = spark_hour(&[ColumnarValue::Scalar(
+ datafusion::common::ScalarValue::TimestampMillisecond(Some(ms),
None),
+ )])?
+ .into_array(1)?;
+ let out_m_scalar = spark_minute(&[ColumnarValue::Scalar(
+ datafusion::common::ScalarValue::TimestampMillisecond(Some(ms),
None),
+ )])?
+ .into_array(1)?;
+ let out_s_scalar = spark_second(&[ColumnarValue::Scalar(
+ datafusion::common::ScalarValue::TimestampMillisecond(Some(ms),
None),
+ )])?
+ .into_array(1)?;
+
+ // Array
+ let arr = Arc::new(TimestampMillisecondArray::from(vec![Some(ms)]));
+ let out_h_array =
spark_hour(&[ColumnarValue::Array(arr.clone())])?.into_array(1)?;
+ let out_m_array =
spark_minute(&[ColumnarValue::Array(arr.clone())])?.into_array(1)?;
+ let out_s_array =
spark_second(&[ColumnarValue::Array(arr)])?.into_array(1)?;
+
+ assert_eq!(&out_h_scalar, &out_h_array);
+ assert_eq!(&out_m_scalar, &out_m_array);
+ assert_eq!(&out_s_scalar, &out_s_array);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_spark_timeparts_pre_epoch_negative_ms() -> Result<()> {
+ let ts = Arc::new(TimestampMillisecondArray::from(vec![Some(-1000)]));
+ let args = vec![ColumnarValue::Array(ts)];
+
+ let expected_h: ArrayRef = Arc::new(Int32Array::from(vec![Some(23)]));
+ let expected_m: ArrayRef = Arc::new(Int32Array::from(vec![Some(59)]));
+ let expected_s: ArrayRef = Arc::new(Int32Array::from(vec![Some(59)]));
+
+ let out_h = spark_hour(&args)?.into_array(1)?;
+ let out_m = spark_minute(&args)?.into_array(1)?;
+ let out_s = spark_second(&args)?.into_array(1)?;
+
+ assert_eq!(&out_h, &expected_h);
+ assert_eq!(&out_m, &expected_m);
+ assert_eq!(&out_s, &expected_s);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_spark_timeparts_null_only() -> Result<()> {
+ let ts = Arc::new(TimestampMillisecondArray::from(vec![None, None]));
+ let args = vec![ColumnarValue::Array(ts)];
+
+ let expected: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
+
+ let out_h = spark_hour(&args)?.into_array(1)?;
+ let out_m = spark_minute(&args)?.into_array(1)?;
+ let out_s = spark_second(&args)?.into_array(1)?;
+
+ assert_eq!(&out_h, &expected);
+ assert_eq!(&out_m, &expected);
+ assert_eq!(&out_s, &expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_hour_utc_vs_shanghai() -> Result<()> {
+ // 1970-01-01 00:00:00 UTC
+ let ts = Arc::new(TimestampMillisecondArray::from(vec![Some(0)]));
+
+ // default (None) -> UTC
+ let out_utc =
spark_hour(&[ColumnarValue::Array(ts.clone())])?.into_array(1)?;
+ let expected_utc: ArrayRef = Arc::new(Int32Array::from(vec![Some(0)]));
+ assert_eq!(&out_utc, &expected_utc);
+
+ let out_cst = spark_hour(&[
+ ColumnarValue::Array(ts.clone()),
+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Shanghai".to_string()))),
+ ])?
+ .into_array(1)?;
+ let expected_cst: ArrayRef = Arc::new(Int32Array::from(vec![Some(8)]));
+ assert_eq!(&out_cst, &expected_cst);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_hour_scalar_vs_array_and_explicit_utc() -> Result<()> {
+ // 1970-01-01 12:34:56 UTC
+ let ts_ms = ms(12, 34, 56);
+
+ let out_scalar =
spark_hour(&[ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
+ Some(ts_ms),
+ arc_tz("UTC"),
+ ))])?
+ .into_array(1)?;
+
+ // Array + default (None = UTC)
+ let arr = Arc::new(TimestampMillisecondArray::from(vec![Some(ts_ms)]));
+ let out_array =
spark_hour(&[ColumnarValue::Array(arr)])?.into_array(1)?;
+
+ let expected: ArrayRef = Arc::new(Int32Array::from(vec![Some(12)]));
+ assert_eq!(&out_scalar, &expected);
+ assert_eq!(&out_array, &expected);
+ Ok(())
+ }
+
+ /// Helper: build epoch ms from a UTC calendar time.
+ fn utc_ms(y: i32, mo: u32, d: u32, h: u32, m: u32, s: u32) -> i64 {
+ // chrono 0.4: with_ymd_and_hms(...).single() for strictness
+ Utc.with_ymd_and_hms(y, mo, d, h, m, s)
+ .single()
+ .expect("valid UTC datetime")
+ .timestamp_millis()
+ }
+
+ #[test]
+ fn test_minute_second_utc_vs_shanghai_and_kolkata() -> Result<()> {
+ // epoch 0 -> 1970-01-01 00:00:00 UTC
+ let ts = Arc::new(TimestampMillisecondArray::from(vec![Some(0)]));
+
+ // Default (None) -> UTC
+ let out_min_utc =
spark_minute(&[ColumnarValue::Array(ts.clone())])?.into_array(1)?;
+ let out_sec_utc =
spark_second(&[ColumnarValue::Array(ts.clone())])?.into_array(1)?;
+
+ let expected_min_utc: ArrayRef =
Arc::new(Int32Array::from(vec![Some(0)]));
+ let expected_sec_utc: ArrayRef =
Arc::new(Int32Array::from(vec![Some(0)]));
+ assert_eq!(&out_min_utc, &expected_min_utc);
+ assert_eq!(&out_sec_utc, &expected_sec_utc);
+
+ // Asia/Shanghai (+08:00) -> 08:00:00 local => minute=0, second=0
+ let out_min_cst = spark_minute(&[
+ ColumnarValue::Array(ts.clone()),
+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Shanghai".to_string()))),
+ ])?
+ .into_array(1)?;
+ let out_sec_cst = spark_second(&[
+ ColumnarValue::Array(ts.clone()),
+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Shanghai".to_string()))),
+ ])?
+ .into_array(1)?;
+ let expected_min_cst: ArrayRef =
Arc::new(Int32Array::from(vec![Some(0)]));
+ let expected_sec_cst: ArrayRef =
Arc::new(Int32Array::from(vec![Some(0)]));
+ assert_eq!(&out_min_cst, &expected_min_cst);
+ assert_eq!(&out_sec_cst, &expected_sec_cst);
+
+ // Asia/Kolkata (+05:30) -> 05:30:00 local => minute=30, second=0
+ let out_min_kol = spark_minute(&[
+ ColumnarValue::Array(ts.clone()),
+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Kolkata".to_string()))),
+ ])?
+ .into_array(1)?;
+ let out_sec_kol = spark_second(&[
+ ColumnarValue::Array(ts),
+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Kolkata".to_string()))),
+ ])?
+ .into_array(1)?;
+ let expected_min_kol: ArrayRef =
Arc::new(Int32Array::from(vec![Some(30)]));
+ let expected_sec_kol: ArrayRef =
Arc::new(Int32Array::from(vec![Some(0)]));
+ assert_eq!(&out_min_kol, &expected_min_kol);
+ assert_eq!(&out_sec_kol, &expected_sec_kol);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_minute_second_nonwhole_offset_kathmandu() -> Result<()> {
+ // 1970-01-01 00:00:00 UTC -> Asia/Kathmandu was +05:30
+ let ts = Arc::new(TimestampMillisecondArray::from(vec![Some(0)]));
+
+ let out_min = spark_minute(&[
+ ColumnarValue::Array(ts.clone()),
+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Kathmandu".to_string()))),
+ ])?
+ .into_array(1)?;
+ let out_sec = spark_second(&[
+ ColumnarValue::Array(ts),
+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("Asia/Kathmandu".to_string()))),
+ ])?
+ .into_array(1)?;
+
+ let expected_min: ArrayRef =
Arc::new(Int32Array::from(vec![Some(30)]));
+ let expected_sec: ArrayRef = Arc::new(Int32Array::from(vec![Some(0)]));
+ assert_eq!(&out_min, &expected_min);
+ assert_eq!(&out_sec, &expected_sec);
+ Ok(())
+ }
+
+ #[test]
+ fn test_minute_second_dst_spring_forward_newyork() -> Result<()> {
+ // America/New_York DST starts on 2019-03-10.
+ // Local time jumps from 01:59:59 to 03:00:00 (02:00:00 - 02:59:59
does not
+ // exist).
+
+ // 2019-03-10 06:59:59 UTC -> 01:59:59 local (EST, UTC-5)
+ let t1 = utc_ms(2019, 3, 10, 6, 59, 59);
+ // 2019-03-10 07:00:00 UTC -> 03:00:00 local (EDT, UTC-4)
+ let t2 = utc_ms(2019, 3, 10, 7, 0, 0);
+
+ let ts = Arc::new(TimestampMillisecondArray::from(vec![Some(t1),
Some(t2)]));
+
+ let out_min = spark_minute(&[
+ ColumnarValue::Array(ts.clone()),
+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("America/New_York".to_string()))),
+ ])?
+ .into_array(2)?;
+ let out_sec = spark_second(&[
+ ColumnarValue::Array(ts.clone()),
+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("America/New_York".to_string()))),
+ ])?
+ .into_array(2)?;
+
+ let expected_min: ArrayRef = Arc::new(Int32Array::from(vec![Some(59),
Some(0)]));
+ let expected_sec: ArrayRef = Arc::new(Int32Array::from(vec![Some(59),
Some(0)]));
+
+ assert_eq!(&out_min, &expected_min);
+ assert_eq!(&out_sec, &expected_sec);
+
+ Ok(())
+ }
}
diff --git
a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronQuerySuite.scala
b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronQuerySuite.scala
index d36fd096..1d368493 100644
---
a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronQuerySuite.scala
+++
b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/auron/AuronQuerySuite.scala
@@ -374,4 +374,130 @@ class AuronQuerySuite
checkAnswer(sql(q), Seq(expected))
}
}
+
+ test("test filter with hour function") {
+ withEnvConf("spark.auron.datetime.extract.enabled" -> "true") {
+ withTable("t_hour") {
+ sql("""
+ |create table t_hour using parquet as
+ |select to_timestamp('2024-12-18 01:23:45') as event_time union all
+ |select to_timestamp('2024-12-18 08:00:00') union all
+ |select to_timestamp('2024-12-18 08:59:59')
+ |""".stripMargin)
+
+ // Keep rows where HOUR >= 8, then group by hour
+ checkAnswer(
+ sql("""
+ |select h, count(*)
+ |from (select hour(event_time) as h from t_hour) t
+ |where h >= 8
+ |group by h
+ |order by h
+ |""".stripMargin),
+ Seq(Row(8, 2)))
+ }
+ }
+ }
+
+ test("test filter with minute function") {
+ withEnvConf("spark.auron.datetime.extract.enabled" -> "true") {
+ withTable("t_minute") {
+ sql("""
+ |create table t_minute using parquet as
+ |select to_timestamp('2024-12-18 00:00:00') as event_time union all
+ |select to_timestamp('2024-12-18 00:30:00') union all
+ |select to_timestamp('2024-12-18 12:30:59')
+ |""".stripMargin)
+
+ // Keep rows where MINUTE = 30, then group by minute
+ checkAnswer(
+ sql("""
+ |select m, count(*)
+ |from (select minute(event_time) as m from t_minute) t
+ |where m = 30
+ |group by m
+ |""".stripMargin),
+ Seq(Row(30, 2)))
+ }
+ }
+ }
+
+ test("test filter with second function") {
+ withEnvConf("spark.auron.datetime.extract.enabled" -> "true") {
+ withTable("t_second") {
+ sql("""
+ |create table t_second using parquet as
+ |select to_timestamp('2024-12-18 00:00:00') as event_time union all
+ |select to_timestamp('2024-12-18 01:23:00') union all
+ |select to_timestamp('2024-12-18 23:59:45')
+ |""".stripMargin)
+
+ // Keep rows where SECOND = 0, then group by second
+ checkAnswer(
+ sql("""
+ |select s, count(*)
+ |from (select second(event_time) as s from t_second) t
+ |where s = 0
+ |group by s
+ |""".stripMargin),
+ Seq(Row(0, 2)))
+ }
+ }
+ }
+
+ // For Date input: hour/minute/second should all be 0
+ test("timeparts on Date input return zeros") {
+ withEnvConf("spark.auron.datetime.extract.enabled" -> "true") {
+ withTable("t_date_parts") {
+ sql(
+ "create table t_date_parts using parquet as select date'2024-12-18'
as d union all select date'2024-12-19'")
+ checkAnswer(
+ sql("""
+ |select
+ | hour(d) as h,
+ | minute(d) as m,
+ | second(d) as s
+ |from t_date_parts
+ |order by d
+ |""".stripMargin),
+ Seq(Row(0, 0, 0), Row(0, 0, 0)))
+ }
+ }
+ }
+
+ test("hour/minute/second respect timezone via from_utc_timestamp") {
+ withEnvConf("spark.auron.datetime.extract.enabled" -> "true") {
+ withTable("t_tz") {
+ // Construct: UTC 1970-01-01 00:00:00 → Asia/Shanghai => local 08:00:00
+ sql("""
+ |create table t_tz using parquet as
+ |select from_utc_timestamp(to_timestamp('1970-01-01 00:00:00'),
'Asia/Shanghai') as ts
+ |""".stripMargin)
+
+ checkAnswer(
+ sql("""
+ |select hour(ts), minute(ts), second(ts)
+ |from t_tz
+ |""".stripMargin),
+ Seq(Row(8, 0, 0)))
+ }
+ }
+ }
+
+ test("minute/second with non-whole-hour offsets") {
+ withEnvConf("spark.auron.datetime.extract.enabled" -> "true") {
+ withTable("t_tz2") {
+ sql("""
+ |create table t_tz2 using parquet as
+ |select from_utc_timestamp(to_timestamp('2000-01-01 00:00:00'),
'Asia/Kolkata') as ts1, -- +05:30
+ | from_utc_timestamp(to_timestamp('2000-01-01 00:00:00'),
'Asia/Kathmandu') as ts2 -- +05:45
+ |""".stripMargin)
+
+ // Kolkata -> 05:30:00; Kathmandu -> 05:45:00
+ checkAnswer(
+ sql("select minute(ts1), second(ts1), minute(ts2), second(ts2) from
t_tz2"),
+ Seq(Row(30, 0, 45, 0)))
+ }
+ }
+ }
}
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
index 1f9c82a9..e8447434 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
@@ -97,6 +97,8 @@ object NativeConverters extends Logging {
AuronConverters.getBooleanConf("spark.auron.udf.brickhouse.enabled",
defaultValue = true)
def decimalArithOpEnabled: Boolean =
AuronConverters.getBooleanConf("spark.auron.decimal.arithOp.enabled",
defaultValue = false)
+ def datetimeExtractEnabled: Boolean =
+ AuronConverters.getBooleanConf("spark.auron.datetime.extract.enabled",
defaultValue = false)
def scalarTypeSupported(dataType: DataType): Boolean = {
dataType match {
@@ -884,6 +886,13 @@ object NativeConverters extends Logging {
case e: Levenshtein =>
buildScalarFunction(pb.ScalarFunction.Levenshtein, e.children,
e.dataType)
+ case e: Hour if datetimeExtractEnabled =>
+ buildTimePartExt("Hour", e.children.head, isPruningExpr, fallback)
+ case e: Minute if datetimeExtractEnabled =>
+ buildTimePartExt("Minute", e.children.head, isPruningExpr, fallback)
+ case e: Second if datetimeExtractEnabled =>
+ buildTimePartExt("Second", e.children.head, isPruningExpr, fallback)
+
// startswith is converted to scalar function in pruning-expr mode
case StartsWith(expr, Literal(prefix, StringType)) if isPruningExpr =>
buildExprNode(
@@ -1290,6 +1299,20 @@ object NativeConverters extends Logging {
.setReturnType(convertDataType(dataType)))
}
+ private def buildTimePartExt(
+ name: String,
+ child: Expression,
+ isPruningExpr: Boolean,
+ fallback: Expression => pb.PhysicalExprNode): pb.PhysicalExprNode = {
+ val tzArg: Expression = child.dataType match {
+ case TimestampType =>
+ Literal(SQLConf.get.sessionLocalTimeZone, StringType)
+ case _ =>
+ Literal.create(null, StringType)
+ }
+ buildExtScalarFunctionNode(name, Seq(child, tzArg), IntegerType,
isPruningExpr, fallback)
+ }
+
def castIfNecessary(expr: Expression, dataType: DataType): Expression = {
if (expr.dataType == dataType) {
return expr