This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 810782da9 feat: Support Spark expression days (#3746)
810782da9 is described below
commit 810782da952022d1f4b3e1fbb05a7724d0e84770
Author: ChenChen Lai <[email protected]>
AuthorDate: Tue Mar 24 20:29:31 2026 +0800
feat: Support Spark expression days (#3746)
---
docs/source/user-guide/latest/expressions.md | 1 +
.../org/apache/comet/serde/QueryPlanSerde.scala | 1 +
.../scala/org/apache/comet/serde/datetime.scala | 53 +++++++++++-
.../comet/CometTemporalExpressionSuite.scala | 96 +++++++++++++++++++++-
4 files changed, 149 insertions(+), 2 deletions(-)
diff --git a/docs/source/user-guide/latest/expressions.md
b/docs/source/user-guide/latest/expressions.md
index 57b7a3455..c8e5475d0 100644
--- a/docs/source/user-guide/latest/expressions.md
+++ b/docs/source/user-guide/latest/expressions.md
@@ -99,6 +99,7 @@ Expressions that are not Spark-compatible will fall back to
Spark by default and
| DateFormat | `date_format` | Yes | Partial
support. Only specific format patterns are supported.
|
| DateSub | `date_sub` | Yes |
|
| DatePart | `date_part(field, source)` | Yes |
Supported values of `field`:
`year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute`
|
+| Days | `days` | Yes | V2
partition transform. Supports DateType and TimestampType inputs.
|
| Extract | `extract(field FROM source)` | Yes |
Supported values of `field`:
`year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute`
|
| FromUnixTime | `from_unixtime` | No | Does not
support format, supports only -8334601211038 <= sec <= 8210266876799
|
| Hour | `hour` | No |
Incorrectly applies timezone conversion to TimestampNTZ inputs
([#3180](https://github.com/apache/datafusion-comet/issues/3180)) |
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 8c39ba779..02a76f69f 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -196,6 +196,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[DateAdd] -> CometDateAdd,
classOf[DateDiff] -> CometDateDiff,
classOf[DateFormatClass] -> CometDateFormat,
+ classOf[Days] -> CometDays,
classOf[DateSub] -> CometDateSub,
classOf[UnixDate] -> CometUnixDate,
classOf[FromUnixTime] -> CometFromUnixTime,
diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala
b/spark/src/main/scala/org/apache/comet/serde/datetime.scala
index 0720f785d..8f3894c1a 100644
--- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala
@@ -21,11 +21,13 @@ package org.apache.comet.serde
import java.util.Locale
-import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd,
DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear,
GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay,
Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay,
WeekOfYear, Year}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd,
DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days,
GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay,
Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay,
WeekOfYear, Year}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, IntegerType, StringType,
TimestampType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.comet.CometSparkSessionExtensions.withInfo
+import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.CometGetDateField.CometGetDateField
import org.apache.comet.serde.ExprOuterClass.Expr
import org.apache.comet.serde.QueryPlanSerde._
@@ -586,3 +588,52 @@ object CometDateFormat extends
CometExpressionSerde[DateFormatClass] {
}
}
}
+
+/**
+ * Converts a timestamp or date to the number of days since Unix epoch
(1970-01-01). This is a V2
+ * partition transform expression.
+ *
+ * For DateType: dates are internally stored as days since epoch, so this is a
simple cast to
+ * integer (same as CometUnixDate).
+ *
+ * For TimestampType: uses a timezone-aware Cast(Timestamp to Date) followed
by Cast(Date to Int).
+ * The first cast respects the session timezone to correctly determine the
date boundary.
+ */
+object CometDays extends CometExpressionSerde[Days] {
+ override def convert(
+ expr: Days,
+ inputs: Seq[Attribute],
+ binding: Boolean): Option[ExprOuterClass.Expr] = {
+ val childExpr = exprToProtoInternal(expr.child, inputs, binding)
+
+ // Normalize input to DateType (Timestamp converts to Date first)
+ val dateExprOpt = expr.child.dataType match {
+ case DateType => childExpr
+ case TimestampType =>
+ val timezone = SQLConf.get.sessionLocalTimeZone
+ childExpr.flatMap { child =>
+ CometCast.castToProto(expr, Some(timezone), DateType, child,
CometEvalMode.LEGACY)
+ }
+ case other =>
+ withInfo(expr, s"Days does not support input type: $other")
+ None
+ }
+
+ // Convert DateType to IntegerType (days since epoch)
+ val optExpr = dateExprOpt.map { dateExpr =>
+ Expr
+ .newBuilder()
+ .setCast(
+ ExprOuterClass.Cast
+ .newBuilder()
+ .setChild(dateExpr)
+ .setDatatype(serializeDataType(IntegerType).get)
+ .setEvalMode(ExprOuterClass.EvalMode.LEGACY)
+ .setAllowIncompat(false)
+ .build())
+ .build()
+ }
+
+ optExprWithInfo(optExpr, expr, expr.child)
+ }
+}
diff --git
a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala
index 1ae6926e0..9f5413933 100644
--- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala
@@ -21,8 +21,11 @@ package org.apache.comet
import scala.util.Random
-import org.apache.spark.sql.{CometTestBase, Row, SaveMode}
+import org.apache.spark.sql.{CometTestBase, DataFrame, Row, SaveMode}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.{Days, Literal}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
@@ -395,4 +398,95 @@ class CometTemporalExpressionSuite extends CometTestBase
with AdaptiveSparkPlanH
// Test null handling
checkSparkAnswerAndOperator("SELECT unix_date(NULL)")
}
+
+ /**
+ * Checks that the Comet-evaluated DataFrame produces the same results as
the baseline DataFrame
+ * evaluated by native Spark JVM, and that Comet native operators are used.
This is needed
+ * because Days is a PartitionTransformExpression that extends Unevaluable,
so
+ * checkSparkAnswerAndOperator cannot be used directly.
+ */
+ private def checkDays(cometDF: DataFrame, baselineDF: DataFrame): Unit = {
+ // Ensure the expected answer is evaluated solely by native Spark JVM
(Comet off)
+ var expected: Array[Row] = Array.empty
+ withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
+ expected = baselineDF.collect()
+ }
+ checkAnswer(cometDF, expected.toSeq)
+ checkCometOperators(stripAQEPlan(cometDF.queryExecution.executedPlan))
+ }
+
+ test("days - date input") {
+ val r = new Random(42)
+ val dateSchema = StructType(Seq(StructField("d", DataTypes.DateType,
true)))
+ val dateDF = FuzzDataGenerator.generateDataFrame(r, spark, dateSchema,
1000, DataGenOptions())
+
+ checkDays(
+ dateDF.select(col("d"),
getColumnFromExpression(Days(UnresolvedAttribute("d")))),
+ dateDF.selectExpr("d", "unix_date(d)"))
+ }
+
+ test("days - timestamp input") {
+ val r = new Random(42)
+ val tsSchema = StructType(Seq(StructField("ts", DataTypes.TimestampType,
true)))
+ val tsDF = FuzzDataGenerator.generateDataFrame(r, spark, tsSchema, 1000,
DataGenOptions())
+
+ for (timezone <- Seq("UTC", "America/Los_Angeles", "Asia/Tokyo")) {
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) {
+ checkDays(
+ tsDF.select(col("ts"),
getColumnFromExpression(Days(UnresolvedAttribute("ts")))),
+ tsDF.selectExpr("ts", "unix_date(cast(ts as date))"))
+ }
+ }
+ }
+
+ test("days - literal edge cases") {
+ withSQLConf(
+ SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
+ "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") {
+
+ val dummyDF = spark.range(1)
+
+ // Pre-epoch (should return negative day numbers)
+ checkDays(
+ dummyDF.select(
+ getColumnFromExpression(
+ Days(Literal.create(java.sql.Date.valueOf("1969-12-31"),
DataTypes.DateType))),
+ getColumnFromExpression(
+ Days(Literal.create(java.sql.Date.valueOf("1960-01-01"),
DataTypes.DateType)))),
+ dummyDF.selectExpr("unix_date(DATE('1969-12-31'))",
"unix_date(DATE('1960-01-01'))"))
+
+ // Epoch and post-epoch
+ checkDays(
+ dummyDF.select(
+ getColumnFromExpression(
+ Days(Literal.create(java.sql.Date.valueOf("1970-01-01"),
DataTypes.DateType))),
+ getColumnFromExpression(
+ Days(Literal.create(java.sql.Date.valueOf("1970-01-02"),
DataTypes.DateType))),
+ getColumnFromExpression(
+ Days(Literal.create(java.sql.Date.valueOf("2024-01-01"),
DataTypes.DateType)))),
+ dummyDF.selectExpr(
+ "unix_date(DATE('1970-01-01'))",
+ "unix_date(DATE('1970-01-02'))",
+ "unix_date(DATE('2024-01-01'))"))
+
+ // Timestamp literals
+ checkDays(
+ dummyDF.select(
+ getColumnFromExpression(Days(Literal
+ .create(java.sql.Timestamp.valueOf("1970-01-01 00:00:00"),
DataTypes.TimestampType))),
+ getColumnFromExpression(
+ Days(
+ Literal.create(
+ java.sql.Timestamp.valueOf("2024-06-15 10:30:00"),
+ DataTypes.TimestampType)))),
+ dummyDF.selectExpr(
+ "unix_date(cast(TIMESTAMP('1970-01-01 00:00:00') as date))",
+ "unix_date(cast(TIMESTAMP('2024-06-15 10:30:00') as date))"))
+
+ // Null handling
+ checkDays(
+ dummyDF.select(getColumnFromExpression(Days(Literal.create(null,
DataTypes.DateType)))),
+ dummyDF.selectExpr("unix_date(cast(NULL as date))"))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]