This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e832f2c [AURON #2128] Implement native function of dayofweek (#2129)
0e832f2c is described below

commit 0e832f2cb9298a0c99bba5e5e261f6f518a974ed
Author: Ming Wei <[email protected]>
AuthorDate: Wed Apr 1 05:50:35 2026 +0800

    [AURON #2128] Implement native function of dayofweek (#2129)
    
    # Which issue does this PR close?
    
    Closes #https://github.com/apache/auron/issues/2128
    
    # Rationale for this change
    To achieve full compatibility with Spark’s date functions, we should
    implement `dayofweek()` with the following characteristics:
    
    _Expected behavior_
    
    Function name: `dayofweek(expr)`
    Return value: `Sunday = 1, Monday = 2, ..., Saturday = 7`
    Example:
    
    `dayofweek('2009-07-30')` → `5`
    
    Supports: `DATE`, `TIMESTAMP`, and compatible string/date inputs
    consistent with existing date extraction functions
    
    Null-safe: should return `NULL` if input is `NULL`
    Array and scalar inputs: consistent with current date extraction
    function implementations
    
    
    # What changes are included in this PR?
    This PR adds native support for the `dayofweek()` function with
    Spark-compatible semantics.
    The following changes are included:
    
    - Added native implementation of `spark_dayofweek()` in the expression
    layer.
    - Added `DayOfWeek` expression support in `NativeConverters` for proper
    Spark → native translation.
    - Added unit tests to verify correctness.
    
    # Are there any user-facing changes?
    No.
    
    # How was this patch tested?
    CI.
    
    Signed-off-by: weimingdiit <[email protected]>
---
 native-engine/datafusion-ext-functions/src/lib.rs  |   1 +
 .../datafusion-ext-functions/src/spark_dates.rs    |  51 ++++++-
 .../org/apache/auron/AuronFunctionSuite.scala      |  27 ++++
 .../apache/spark/sql/auron/NativeConverters.scala  | 146 +++++++++++----------
 4 files changed, 158 insertions(+), 67 deletions(-)

diff --git a/native-engine/datafusion-ext-functions/src/lib.rs 
b/native-engine/datafusion-ext-functions/src/lib.rs
index 9464722e..4b636a40 100644
--- a/native-engine/datafusion-ext-functions/src/lib.rs
+++ b/native-engine/datafusion-ext-functions/src/lib.rs
@@ -74,6 +74,7 @@ pub fn create_auron_ext_function(
         "Spark_Year" => Arc::new(spark_dates::spark_year),
         "Spark_Month" => Arc::new(spark_dates::spark_month),
         "Spark_Day" => Arc::new(spark_dates::spark_day),
+        "Spark_DayOfWeek" => Arc::new(spark_dates::spark_dayofweek),
         "Spark_Quarter" => Arc::new(spark_dates::spark_quarter),
         "Spark_Hour" => Arc::new(spark_dates::spark_hour),
         "Spark_Minute" => Arc::new(spark_dates::spark_minute),
diff --git a/native-engine/datafusion-ext-functions/src/spark_dates.rs 
b/native-engine/datafusion-ext-functions/src/spark_dates.rs
index 800b0e53..08ec40a3 100644
--- a/native-engine/datafusion-ext-functions/src/spark_dates.rs
+++ b/native-engine/datafusion-ext-functions/src/spark_dates.rs
@@ -16,7 +16,7 @@
 use std::sync::Arc;
 
 use arrow::{
-    array::{ArrayRef, Int32Array, TimestampMillisecondArray},
+    array::{ArrayRef, Date32Array, Int32Array, TimestampMillisecondArray},
     compute::{DatePart, date_part},
     datatypes::{DataType, TimeUnit},
 };
@@ -46,6 +46,32 @@ pub fn spark_day(args: &[ColumnarValue]) -> 
Result<ColumnarValue> {
     Ok(ColumnarValue::Array(date_part(&input, DatePart::Day)?))
 }
 
+/// `spark_dayofweek(date/timestamp/compatible-string)`
+///
+/// Matches Spark's `dayofweek()` semantics:
+/// Sunday = 1, Monday = 2, ..., Saturday = 7.
+pub fn spark_dayofweek(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    let input = cast(&args[0].clone().into_array(1)?, &DataType::Date32)?;
+    let input = input
+        .as_any()
+        .downcast_ref::<Date32Array>()
+        .expect("internal cast to Date32 must succeed");
+
+    // Date32 is days since 1970-01-01. 1970-01-01 is a Thursday.
+    // If we number weekdays so that Sunday = 0, ..., Saturday = 6,
+    // then 1970-01-01 corresponds to 4. For an offset `days`,
+    // weekday_index = (days + 4) mod 7 gives 0 = Sunday, ..., 6 = Saturday.
+    // Spark wants Sunday = 1, ..., Saturday = 7, so we add 1.
+    let dayofweek = Int32Array::from_iter(input.iter().map(|opt_days| {
+        opt_days.map(|days| {
+            let weekday_index = (days as i64 + 4).rem_euclid(7);
+            weekday_index as i32 + 1
+        })
+    }));
+
+    Ok(ColumnarValue::Array(Arc::new(dayofweek)))
+}
+
 /// `spark_quarter(date/timestamp/compatible-string)`
 ///
 /// Simulates Spark's `quarter()` function.
@@ -258,6 +284,29 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_spark_dayofweek() -> Result<()> {
+        let input = Arc::new(Date32Array::from(vec![
+            Some(-1),
+            Some(0),
+            Some(2),
+            Some(3),
+            Some(4),
+            None,
+        ]));
+        let args = vec![ColumnarValue::Array(input)];
+        let expected_ret: ArrayRef = Arc::new(Int32Array::from(vec![
+            Some(4),
+            Some(5),
+            Some(7),
+            Some(1),
+            Some(2),
+            None,
+        ]));
+        assert_eq!(&spark_dayofweek(&args)?.into_array(1)?, &expected_ret);
+        Ok(())
+    }
+
     #[test]
     fn test_spark_quarter_basic() -> Result<()> {
         // Date32 days relative to 1970-01-01:
diff --git 
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala
 
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala
index 06e564a9..442a6f51 100644
--- 
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala
+++ 
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.auron
 import java.text.SimpleDateFormat
 
 import org.apache.spark.sql.{AuronQueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
 
 import org.apache.auron.util.AuronTestUtils
 
@@ -117,6 +118,32 @@ class AuronFunctionSuite extends AuronQueryTest with 
BaseAuronSQLSuite {
     }
   }
 
+  test("dayofweek function") {
+    withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+      withTable("t1") {
+        sql("create table t1(c1 date, c2 timestamp) using parquet")
+        sql("""
+            |insert into t1 values
+            |  (date'2009-07-30', timestamp'2009-07-30 12:34:56'),
+            |  (date'2024-02-29', timestamp'2024-02-29 23:59:59'),
+            |  (null, null)
+            |""".stripMargin)
+
+        // DATE column
+        checkSparkAnswerAndOperator("select dayofweek(c1) from t1 where c1 is 
not null")
+
+        // NULL DATE input should return NULL
+        checkSparkAnswerAndOperator("select dayofweek(c1) from t1 where c1 is 
null")
+
+        // TIMESTAMP column
+        checkSparkAnswerAndOperator("select dayofweek(c2) from t1 where c2 is 
not null")
+
+        // NULL TIMESTAMP input should return NULL
+        checkSparkAnswerAndOperator("select dayofweek(c2) from t1 where c2 is 
null")
+      }
+    }
+  }
+
   test("stddev_samp function with UDAF fallback") {
     withSQLConf("spark.auron.udafFallback.enable" -> "true") {
       withTable("t1") {
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
index df5c0732..aee252f2 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
@@ -274,6 +274,54 @@ object NativeConverters extends Logging {
     override def toString(): String = s"$getClass() dataType:$dataType)"
   }
 
+  private def buildSparkUdfWrapperExpr(
+      sparkExpr: Expression,
+      fallback: Expression => pb.PhysicalExprNode): pb.PhysicalExprNode = {
+    // update subquery result if needed
+    sparkExpr.foreach {
+      case subquery: ExecSubqueryExpression =>
+        prepareExecSubquery(subquery)
+      case _ =>
+    }
+    val exprString = sparkExpr.toString()
+
+    // bind all convertible children
+    val convertedChildren = mutable.LinkedHashMap[pb.PhysicalExprNode, 
BoundReference]()
+    val bound = sparkExpr.mapChildren(_.transformDown {
+      case p: Literal => p
+      case p =>
+        try {
+          val convertedChild = convertExprWithFallback(p, isPruningExpr = 
false, fallback)
+          val nextBindIndex = convertedChildren.size
+          convertedChildren.getOrElseUpdate(
+            convertedChild,
+            BoundReference(nextBindIndex, p.dataType, p.nullable))
+        } catch {
+          case _: Exception | _: NotImplementedError => p
+        }
+    })
+
+    val paramsSchema = StructType(
+      convertedChildren.values
+        .map(ref => StructField("", ref.dataType, ref.nullable))
+        .toSeq)
+
+    val serialized =
+      serializeExpression(bound.asInstanceOf[Expression with Serializable], 
paramsSchema)
+
+    pb.PhysicalExprNode
+      .newBuilder()
+      .setSparkUdfWrapperExpr(
+        pb.PhysicalSparkUDFWrapperExprNode
+          .newBuilder()
+          .setSerialized(ByteString.copyFrom(serialized))
+          .setReturnType(convertDataType(bound.dataType))
+          .setReturnNullable(bound.nullable)
+          .addAllParams(convertedChildren.keys.asJava)
+          .setExprString(exprString))
+      .build()
+  }
+
   def convertExpr(sparkExpr: Expression): pb.PhysicalExprNode = {
     def fallbackToError: Expression => pb.PhysicalExprNode = { e =>
       throw new NotImplementedError(s"unsupported expression: (${e.getClass}) 
$e")
@@ -321,52 +369,7 @@ object NativeConverters extends Logging {
     } catch {
       case e: NotImplementedError =>
         logWarning(s"Falling back expression: $e")
-
-        // update subquery result if needed
-        sparkExpr.foreach {
-          case subquery: ExecSubqueryExpression =>
-            prepareExecSubquery(subquery)
-          case _ =>
-        }
-        val exprString = sparkExpr.toString()
-
-        // bind all convertible children
-        val convertedChildren = mutable.LinkedHashMap[pb.PhysicalExprNode, 
BoundReference]()
-        val bound = sparkExpr.mapChildren(_.transformDown {
-          case p: Literal => p
-          case p =>
-            try {
-              val convertedChild =
-                convertExprWithFallback(p, isPruningExpr = false, 
fallbackToError)
-              val nextBindIndex = convertedChildren.size
-              convertedChildren.getOrElseUpdate(
-                convertedChild,
-                BoundReference(nextBindIndex, p.dataType, p.nullable))
-            } catch {
-              case _: Exception | _: NotImplementedError => p
-            }
-        })
-
-        val paramsSchema = StructType(
-          convertedChildren.values
-            .map(ref => StructField("", ref.dataType, ref.nullable))
-            .toSeq)
-
-        val serialized =
-          serializeExpression(bound.asInstanceOf[Expression with 
Serializable], paramsSchema)
-
-        // build SparkUDFWrapperExpr
-        pb.PhysicalExprNode
-          .newBuilder()
-          .setSparkUdfWrapperExpr(
-            pb.PhysicalSparkUDFWrapperExprNode
-              .newBuilder()
-              .setSerialized(ByteString.copyFrom(serialized))
-              .setReturnType(convertDataType(bound.dataType))
-              .setReturnNullable(bound.nullable)
-              .addAllParams(convertedChildren.keys.asJava)
-              .setExprString(exprString))
-          .build()
+        buildSparkUdfWrapperExpr(sparkExpr, fallbackToError)
     }
   }
 
@@ -467,27 +470,36 @@ object NativeConverters extends Logging {
         }
 
       // cast
-      // not performing native cast for timestamp/dates (will use UDFWrapper 
instead)
-      case cast: Cast
-          if !Seq(cast.dataType, cast.child.dataType).exists(t =>
-            t.isInstanceOf[TimestampType] || t.isInstanceOf[DateType]) =>
-        val castChild =
-          if (cast.child.dataType == StringType &&
-            (cast.dataType.isInstanceOf[NumericType] || cast.dataType
-              .isInstanceOf[BooleanType]) &&
-            castTrimStringEnabled) {
-            // converting Cast(str as num) to StringTrim(Cast(str as num)) if 
enabled
-            StringTrim(cast.child)
-          } else {
-            cast.child
+      case cast: Cast =>
+        val involvesDateOrTimestamp =
+          Seq(cast.dataType, cast.child.dataType).exists {
+            case DateType | TimestampType => true
+            case _ => false
+          }
+
+        if (involvesDateOrTimestamp) {
+          // Keep timestamp/date casts executable in native projects by 
wrapping
+          // the Spark expression, since native cast does not support these 
types directly.
+          buildSparkUdfWrapperExpr(cast, fallback)
+        } else {
+          val castChild =
+            if (cast.child.dataType == StringType &&
+              (cast.dataType.isInstanceOf[NumericType] || cast.dataType
+                .isInstanceOf[BooleanType]) &&
+              castTrimStringEnabled) {
+              // converting Cast(str as num) to StringTrim(Cast(str as num)) 
if enabled
+              StringTrim(cast.child)
+            } else {
+              cast.child
+            }
+          buildExprNode {
+            _.setTryCast(
+              pb.PhysicalTryCastNode
+                .newBuilder()
+                .setExpr(convertExprWithFallback(castChild, isPruningExpr, 
fallback))
+                .setArrowType(convertDataType(cast.dataType))
+                .build())
           }
-        buildExprNode {
-          _.setTryCast(
-            pb.PhysicalTryCastNode
-              .newBuilder()
-              .setExpr(convertExprWithFallback(castChild, isPruningExpr, 
fallback))
-              .setArrowType(convertDataType(cast.dataType))
-              .build())
         }
 
       // in
@@ -926,6 +938,8 @@ object NativeConverters extends Logging {
       case Year(child) => buildExtScalarFunction("Spark_Year", child :: Nil, 
IntegerType)
       case Month(child) => buildExtScalarFunction("Spark_Month", child :: Nil, 
IntegerType)
       case DayOfMonth(child) => buildExtScalarFunction("Spark_Day", child :: 
Nil, IntegerType)
+      case DayOfWeek(child) =>
+        buildExtScalarFunction("Spark_DayOfWeek", child :: Nil, IntegerType)
       case Quarter(child) => buildExtScalarFunction("Spark_Quarter", child :: 
Nil, IntegerType)
 
       case e: Levenshtein =>

Reply via email to