This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c577ae7202a7 [SPARK-51423][SQL] Add the current_time() function for TIME datatype c577ae7202a7 is described below commit c577ae7202a778a23601925b4d803d3389f0c0dd Author: Sakthi <sak...@apache.org> AuthorDate: Fri Apr 11 11:12:00 2025 +0200 [SPARK-51423][SQL] Add the current_time() function for TIME datatype ### What changes were proposed in this pull request? This PR adds support for a new function current_time() which returns the current time at the start of query evaluation. ```bash # happy cases scala> spark.sql("SELECT current_time(0);").show() +---------------+ |current_time(0)| +---------------+ | 17:11:26| +---------------+ scala> spark.sql("SELECT current_time(3);").show() +---------------+ |current_time(3)| +---------------+ | 17:11:50.225| +---------------+ scala> spark.sql("SELECT current_time(6);").show() +---------------+ |current_time(6)| +---------------+ |17:12:00.734735| +---------------+ # No braces and Empty braces scala> spark.sql("SELECT current_time;").show() +---------------+ |current_time(6)| +---------------+ |17:12:23.132088| +---------------+ scala> spark.sql("SELECT current_time();").show() +---------------+ |current_time(6)| +---------------+ |17:12:26.718602| +---------------+ # foldability ## Nested Arithmetic scala> spark.sql("SELECT current_time((4 - 2) * (1 + 1));").show() +---------------------------------+ |current_time(((4 - 2) * (1 + 1)))| +---------------------------------+ | 17:13:04.4647| +---------------------------------+ ## Casting String literals scala> spark.sql("SELECT current_time(CAST(' 0005 ' AS INT));").show() +---------------------------------+ |current_time(CAST( 0005 AS INT))| +---------------------------------+ | 17:13:26.28039| +---------------------------------+ scala> spark.sql("SELECT current_time('5');").show() +---------------+ |current_time(5)| +---------------+ | 22:34:07.65007| +---------------+ ## Combine Cast and Arithmetic scala> spark.sql("SELECT current_time(CAST('4' AS INT) * CAST('1' AS INT));").show() +-----------------------------------------------+ |current_time((CAST(4 AS INT) * CAST(1 AS INT)))| +-----------------------------------------------+ | 17:14:06.7151| +-----------------------------------------------+ # failure cases scala> spark.sql("SELECT current_time(-1);").show() org.apache.spark.sql.catalyst.ExtendedAnalysisException: [DATATYPE_MISMATCH.VALUE_OUT_OF_RANGE] Cannot resolve "current_time(-1)" due to data type mismatch: The `precision` must be between [0, 6] (current value = -1). SQLSTATE: 42K09; line 1 pos 7; 'Project [unresolvedalias(current_time(-1))] +- OneRowRelation' scala> spark.sql("SELECT current_time('foo');").show() org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value 'foo' of the type "STRING" cannot be cast to "INT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018 == SQL (line 1, position 8) == SELECT current_time('foo'); scala> spark.sql("SELECT current_time(2,2);").show() org.apache.spark.sql.AnalysisException: [WRONG_NUM_ARGS.WITHOUT_SUGGESTION] The `current_time` requires [0, 1] parameters but the actual number is 2. Please, refer to 'https://spark.apache.org/docs/latest/sql-ref-functions.html' for a fix. SQLSTATE: 42605; line 1 pos 7 # All calls of current_time within the same query should return the same value. scala> val df = spark.sql(""" | SELECT | current_time AS col1, | current_time() AS col2, | current_time(0) AS col3, | current_time(1) AS col4, | current_time(2) AS col5, | current_time(3) AS col6, | current_time(4) AS col7, | current_time(5) AS col8, | current_time(6) AS col9, | current_time AS col10 | """) val df: org.apache.spark.sql.DataFrame = [col1: time(6), col2: time(6) ... 8 more fields] scala> df.show() +---------------+---------------+--------+----------+-----------+-----------+-------------+--------------+---------------+---------------+ | col1| col2| col3| col4| col5| col6| col7| col8| col9| col10| +---------------+---------------+--------+----------+-----------+-----------+-------------+--------------+---------------+---------------+ |17:15:47.680648|17:15:47.680648|17:15:47|17:15:47.6|17:15:47.68|17:15:47.68|17:15:47.6806|17:15:47.68064|17:15:47.680648|17:15:47.680648| +---------------+---------------+--------+----------+-----------+-----------+-------------+--------------+---------------+---------------+ ``` ### Why are the changes needed? Adds a built-in current_time([n]) function returning just the time portion (in a TIME(n) type). This aligns Spark with other SQL systems offering a native time function, improves convenience for time-only queries, and complements existing functions like current_date and current_timestamp. ### Does this PR introduce _any_ user-facing change? Yes, adds a new function. Users can now get the current time using this function. ### How was this patch tested? Manual testing as shown above and running UTs added: ```bash $ build/sbt "test:testOnly *TimeExpressionsSuite.scala" $ build/sbt "test:testOnly *ComputeCurrentTimeSuite.scala" $ build/sbt "test:testOnly *ResolveInlineTablesSuite.scala $ build/sbt "test:testOnly *AnalysisSuite.scala ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #50336 from the-sakthi/SPARK-51162. Authored-by: Sakthi <sak...@apache.org> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 2 +- .../sql/catalyst/util/SparkDateTimeUtils.scala | 33 ++++++ .../sql/catalyst/analysis/FunctionRegistry.scala | 1 + .../analysis/LiteralFunctionResolution.scala | 15 +-- .../sql/catalyst/expressions/timeExpressions.scala | 114 ++++++++++++++++++++- .../sql/catalyst/optimizer/finishAnalysis.scala | 6 ++ .../spark/sql/catalyst/parser/AstBuilder.scala | 4 +- .../sql/catalyst/analysis/AnalysisSuite.scala | 19 ++++ .../analysis/ResolveInlineTablesSuite.scala | 30 +++++- .../expressions/TimeExpressionsSuite.scala | 49 ++++++++- .../optimizer/ComputeCurrentTimeSuite.scala | 94 ++++++++++++++++- .../catalyst/parser/ExpressionParserSuite.scala | 4 +- .../sql-functions/sql-expression-schema.md | 1 + .../org/apache/spark/sql/SQLQueryTestHelper.scala | 5 +- .../sql/expressions/ExpressionInfoSuite.scala | 1 + 15 files changed, 357 insertions(+), 21 deletions(-) diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index ec23c7346216..746c9d9386d0 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -1155,7 +1155,7 @@ datetimeUnit ; primaryExpression - : name=(CURRENT_DATE | CURRENT_TIMESTAMP | CURRENT_USER | USER | SESSION_USER) #currentLike + : name=(CURRENT_DATE | CURRENT_TIMESTAMP | CURRENT_USER | USER | SESSION_USER | CURRENT_TIME) #currentLike | name=(TIMESTAMPADD | DATEADD | DATE_ADD) LEFT_PAREN (unit=datetimeUnit | invalidUnit=stringLit) COMMA unitsAmount=valueExpression COMMA timestamp=valueExpression RIGHT_PAREN #timestampadd | name=(TIMESTAMPDIFF | DATEDIFF | DATE_DIFF | TIMEDIFF) LEFT_PAREN (unit=datetimeUnit | invalidUnit=stringLit) COMMA startTimestamp=valueExpression COMMA endTimestamp=valueExpression RIGHT_PAREN #timestampdiff | CASE whenClause+ (ELSE elseExpression=expression)? END #searchedCase diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala index 73fbebeb98c5..b16ee9ad1929 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala @@ -133,6 +133,39 @@ trait SparkDateTimeUtils { } } + /** + * Gets the number of microseconds since midnight using the session time zone. + */ + def instantToMicrosOfDay(instant: Instant, timezone: String): Long = { + val zoneId = getZoneId(timezone) + val localDateTime = LocalDateTime.ofInstant(instant, zoneId) + localDateTime.toLocalTime.getLong(MICRO_OF_DAY) + } + + /** + * Truncates a time value (in microseconds) to the specified fractional precision `p`. + * + * For example, if `p = 3`, we keep millisecond resolution and discard any digits beyond the + * thousand-microsecond place. So a value like `123456` microseconds (12:34:56.123456) becomes + * `123000` microseconds (12:34:56.123). + * + * @param micros + * The original time in microseconds. + * @param p + * The fractional second precision (range 0 to 6). + * @return + * The truncated microsecond value, preserving only `p` fractional digits. + */ + def truncateTimeMicrosToPrecision(micros: Long, p: Int): Long = { + assert( + p >= TimeType.MIN_PRECISION && p <= TimeType.MICROS_PRECISION, + s"Fractional second precision $p out" + + s" of range [${TimeType.MIN_PRECISION}..${TimeType.MICROS_PRECISION}].") + val scale = TimeType.MICROS_PRECISION - p + val factor = math.pow(10, scale).toLong + (micros / factor) * factor + } + /** * Converts the timestamp `micros` from one timezone to another. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 537469b2af27..7ffeedcf9474 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -627,6 +627,7 @@ object FunctionRegistry { expression[CurrentDate]("current_date"), expressionBuilder("curdate", CurDateExpressionBuilder, setAlias = true), expression[CurrentTimestamp]("current_timestamp"), + expression[CurrentTime]("current_time"), expression[CurrentTimeZone]("current_timezone"), expression[LocalTimestamp]("localtimestamp"), expression[DateDiff]("datediff"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/LiteralFunctionResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/LiteralFunctionResolution.scala index c7faf0536b77..865a780b61da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/LiteralFunctionResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/LiteralFunctionResolution.scala @@ -17,16 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.expressions.{ - Alias, - CurrentDate, - CurrentTimestamp, - CurrentUser, - Expression, - GroupingID, - NamedExpression, - VirtualColumn -} +import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTime, CurrentTimestamp, CurrentUser, Expression, GroupingID, NamedExpression, VirtualColumn} import org.apache.spark.sql.catalyst.util.toPrettySQL /** @@ -47,10 +38,12 @@ object LiteralFunctionResolution { } } - // support CURRENT_DATE, CURRENT_TIMESTAMP, CURRENT_USER, USER, SESSION_USER and grouping__id + // support CURRENT_DATE, CURRENT_TIMESTAMP, CURRENT_TIME, + // CURRENT_USER, USER, SESSION_USER and grouping__id private val literalFunctions: Seq[(String, () => Expression, Expression => String)] = Seq( (CurrentDate().prettyName, () => CurrentDate(), toPrettySQL(_)), (CurrentTimestamp().prettyName, () => CurrentTimestamp(), toPrettySQL(_)), + (CurrentTime().prettyName, () => CurrentTime(), toPrettySQL(_)), (CurrentUser().prettyName, () => CurrentUser(), toPrettySQL), ("user", () => CurrentUser(), toPrettySQL), ("session_user", () => CurrentUser(), toPrettySQL), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala index 8b4fa13fe276..505b00231ecd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala @@ -19,13 +19,17 @@ package org.apache.spark.sql.catalyst.expressions import java.time.DateTimeException -import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder +import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, TypeCheckResult} +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.Cast.{toSQLExpr, toSQLId, toSQLType, toSQLValue} import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} +import org.apache.spark.sql.catalyst.trees.TreePattern.{CURRENT_LIKE, TreePattern} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.TimeFormatter +import org.apache.spark.sql.catalyst.util.TypeUtils.{ordinalNumber} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.types.StringTypeWithCollation -import org.apache.spark.sql.types.{AbstractDataType, IntegerType, ObjectType, TimeType, TypeCollection} +import org.apache.spark.sql.types.{AbstractDataType, DataType, IntegerType, ObjectType, TimeType, TypeCollection} import org.apache.spark.unsafe.types.UTF8String /** @@ -349,3 +353,109 @@ object SecondExpressionBuilder extends ExpressionBuilder { } } +/** + * Returns the current time at the start of query evaluation. + * There is no code generation since this expression should get constant folded by the optimizer. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ + _FUNC_([precision]) - Returns the current time at the start of query evaluation. + All calls of current_time within the same query return the same value. + + _FUNC_ - Returns the current time at the start of query evaluation. + """, + arguments = """ + Arguments: + * precision - An optional integer literal in the range [0..6], indicating how many + fractional digits of seconds to include. If omitted, the default is 6. + """, + examples = """ + Examples: + > SELECT _FUNC_(); + 15:49:11.914120 + > SELECT _FUNC_; + 15:49:11.914120 + > SELECT _FUNC_(0); + 15:49:11 + > SELECT _FUNC_(3); + 15:49:11.914 + > SELECT _FUNC_(1+1); + 15:49:11.91 + """, + group = "datetime_funcs", + since = "4.1.0" +) +case class CurrentTime(child: Expression = Literal(TimeType.MICROS_PRECISION)) + extends UnaryExpression with FoldableUnevaluable with ImplicitCastInputTypes { + + def this() = { + this(Literal(TimeType.MICROS_PRECISION)) + } + + final override val nodePatterns: Seq[TreePattern] = Seq(CURRENT_LIKE) + + override def nullable: Boolean = false + + override def checkInputDataTypes(): TypeCheckResult = { + // Check foldability + if (!child.foldable) { + return DataTypeMismatch( + errorSubClass = "NON_FOLDABLE_INPUT", + messageParameters = Map( + "inputName" -> toSQLId("precision"), + "inputType" -> toSQLType(child.dataType), + "inputExpr" -> toSQLExpr(child) + ) + ) + } + + // Evaluate + val precisionValue = child.eval() + if (precisionValue == null) { + return DataTypeMismatch( + errorSubClass = "UNEXPECTED_NULL", + messageParameters = Map("exprName" -> "precision")) + } + + // Check numeric range + precisionValue match { + case n: Number => + val p = n.intValue() + if (p < TimeType.MIN_PRECISION || p > TimeType.MICROS_PRECISION) { + return DataTypeMismatch( + errorSubClass = "VALUE_OUT_OF_RANGE", + messageParameters = Map( + "exprName" -> toSQLId("precision"), + "valueRange" -> s"[${TimeType.MIN_PRECISION}, ${TimeType.MICROS_PRECISION}]", + "currentValue" -> toSQLValue(p, IntegerType) + ) + ) + } + case _ => + return DataTypeMismatch( + errorSubClass = "UNEXPECTED_INPUT_TYPE", + messageParameters = Map( + "paramIndex" -> ordinalNumber(0), + "requiredType" -> toSQLType(IntegerType), + "inputSql" -> toSQLExpr(child), + "inputType" -> toSQLType(child.dataType)) + ) + } + TypeCheckSuccess + } + + // Because checkInputDataTypes ensures the argument is foldable & valid, + // we can directly evaluate here. + lazy val precision: Int = child.eval().asInstanceOf[Number].intValue() + + override def dataType: DataType = TimeType(precision) + + override def prettyName: String = "current_time" + + override protected def withNewChildInternal(newChild: Expression): Expression = { + copy(child = newChild) + } + + override def inputTypes: Seq[AbstractDataType] = Seq(IntegerType) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 0fbfce5962c7..21e09f2e56d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.trees.TreePatternBits import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, instantToMicros, localDateTimeToMicros} +import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils.{instantToMicrosOfDay, truncateTimeMicrosToPrecision} import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.types._ @@ -113,6 +114,7 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { val instant = Instant.now() val currentTimestampMicros = instantToMicros(instant) val currentTime = Literal.create(currentTimestampMicros, TimestampType) + val currentTimeOfDayMicros = instantToMicrosOfDay(instant, conf.sessionLocalTimeZone) val timezone = Literal.create(conf.sessionLocalTimeZone, StringType) val currentDates = collection.mutable.HashMap.empty[ZoneId, Literal] val localTimestamps = collection.mutable.HashMap.empty[ZoneId, Literal] @@ -129,6 +131,10 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { Literal.create( DateTimeUtils.microsToDays(currentTimestampMicros, cd.zoneId), DateType) }) + case currentTimeType : CurrentTime => + val truncatedTime = truncateTimeMicrosToPrecision(currentTimeOfDayMicros, + currentTimeType.precision) + Literal.create(truncatedTime, TimeType(currentTimeType.precision)) case CurrentTimestamp() | Now() => currentTime case CurrentTimeZone() => timezone case localTimestamp: LocalTimestamp => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index b40075fcccee..d8ce90f40b89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2888,12 +2888,14 @@ class AstBuilder extends DataTypeAstBuilder CurrentDate() case SqlBaseParser.CURRENT_TIMESTAMP => CurrentTimestamp() + case SqlBaseParser.CURRENT_TIME => + CurrentTime() case SqlBaseParser.CURRENT_USER | SqlBaseParser.USER | SqlBaseParser.SESSION_USER => CurrentUser() } } else { // If the parser is not in ansi mode, we should return `UnresolvedAttribute`, in case there - // are columns named `CURRENT_DATE` or `CURRENT_TIMESTAMP`. + // are columns named `CURRENT_DATE` or `CURRENT_TIMESTAMP` or `CURRENT_TIME` UnresolvedAttribute.quoted(ctx.name.getText) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index eab4ddc666be..f0dabfd976a7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -819,6 +819,25 @@ class AnalysisSuite extends AnalysisTest with Matchers { } } + test("CURRENT_TIME should be case insensitive") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val input = Project(Seq( + // The user references "current_time" or "CURRENT_TIME" in the query + UnresolvedAttribute("current_time"), + UnresolvedAttribute("CURRENT_TIME") + ), testRelation) + + // The analyzer should resolve both to the same expression: CurrentTime() + val expected = Project(Seq( + Alias(CurrentTime(), toPrettySQL(CurrentTime()))(), + Alias(CurrentTime(), toPrettySQL(CurrentTime()))() + ), testRelation).analyze + + checkAnalysis(input, expected) + } + } + + test("CTE with non-existing column alias") { assertAnalysisErrorCondition(parsePlan("WITH t(x) AS (SELECT 1) SELECT * FROM t WHERE y = 1"), "UNRESOLVED_COLUMN.WITH_SUGGESTION", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala index f231164d5c25..662c740e2f96 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala @@ -21,11 +21,11 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.EvaluateUnresolvedInlineTable -import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, CurrentTimestamp, Literal, Rand} +import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, CurrentTime, CurrentTimestamp, Literal, Rand} import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.optimizer.{ComputeCurrentTime, EvalInlineTables} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation -import org.apache.spark.sql.types.{LongType, NullType, TimestampType} +import org.apache.spark.sql.types.{LongType, NullType, TimestampType, TimeType} /** * Unit tests for [[ResolveInlineTables]]. Note that there are also test cases defined in @@ -113,6 +113,32 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter { } } + test("cast and execute CURRENT_TIME expressions") { + val table = UnresolvedInlineTable( + Seq("c1"), + Seq( + Seq(CurrentTime()), + Seq(CurrentTime()) + ) + ) + val resolved = ResolveInlineTables(table) + assert(resolved.isInstanceOf[ResolvedInlineTable], + "Expected an inline table to be resolved into a ResolvedInlineTable") + + val transformed = ComputeCurrentTime(resolved) + EvalInlineTables(transformed) match { + case LocalRelation(output, data, _, _) => + // expect default precision = 6 + assert(output.map(_.dataType) == Seq(TimeType(6))) + // Should have 2 rows + assert(data.size == 2) + // Both rows should have the *same* microsecond value for current_time + assert(data(0).getLong(0) == data(1).getLong(0), + "Both CURRENT_TIME calls must yield the same value in the same query") + } + } + + test("convert TimeZoneAwareExpression") { val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType)))) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeExpressionsSuite.scala index e0aebb46ccce..add505321b8c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeExpressionsSuite.scala @@ -19,8 +19,10 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.{SPARK_DOC_ROOT, SparkDateTimeException, SparkFunSuite} import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.Cast.{toSQLId, toSQLValue} import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ -import org.apache.spark.sql.types.{StringType, TimeType} +import org.apache.spark.sql.types.{IntegerType, StringType, TimeType} class TimeExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("ParseToTime") { @@ -226,4 +228,49 @@ class TimeExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkConsistencyBetweenInterpretedAndCodegen( (child: Expression) => SecondsOfTime(child).replacement, TimeType()) } + + test("CurrentTime") { + // test valid precision + var expr = CurrentTime(Literal(3)) + assert(expr.dataType == TimeType(3), "Should produce TIME(3) data type") + assert(expr.checkInputDataTypes() == TypeCheckSuccess) + + // test default constructor => TIME(6) + expr = CurrentTime() + assert(expr.precision == 6, "Default precision should be 6") + assert(expr.dataType == TimeType(6)) + assert(expr.checkInputDataTypes() == TypeCheckSuccess) + + // test no value => TIME() + expr = CurrentTime() + assert(expr.precision == 6, "Default precision should be 6") + assert(expr.dataType == TimeType(6)) + assert(expr.checkInputDataTypes() == TypeCheckSuccess) + + // test foldable value + expr = CurrentTime(Literal(1 + 1)) + assert(expr.precision == 2, "Precision should be 2") + assert(expr.dataType == TimeType(2)) + assert(expr.checkInputDataTypes() == TypeCheckSuccess) + + // test out of range precision => checkInputDataTypes fails + expr = CurrentTime(Literal(2 + 8)) + assert(expr.checkInputDataTypes() == + DataTypeMismatch( + errorSubClass = "VALUE_OUT_OF_RANGE", + messageParameters = Map( + "exprName" -> toSQLId("precision"), + "valueRange" -> s"[${TimeType.MIN_PRECISION}, ${TimeType.MICROS_PRECISION}]", + "currentValue" -> toSQLValue(10, IntegerType) + ) + ) + ) + + // test non number value should fail since we skip analyzer here + expr = CurrentTime(Literal("2")) + val failure = intercept[ClassCastException] { + expr.precision + } + assert(failure.getMessage.contains("cannot be cast to class java.lang.Number")) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala index 6e1c7fc887d4..3fa6459a93e2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala @@ -24,12 +24,13 @@ import scala.concurrent.duration._ import scala.jdk.CollectionConverters.MapHasAsScala import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, Expression, InSubquery, ListQuery, Literal, LocalTimestamp, Now} +import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Cast, CurrentDate, CurrentTime, CurrentTimestamp, CurrentTimeZone, Expression, InSubquery, ListQuery, Literal, LocalTimestamp, Now} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.IntegerType import org.apache.spark.unsafe.types.UTF8String class ComputeCurrentTimeSuite extends PlanTest { @@ -52,6 +53,83 @@ class ComputeCurrentTimeSuite extends PlanTest { assert(lits(0) == lits(1)) } + test("analyzer should replace current_time with literals") { + // logical plan that calls current_time() twice in the Project + val planInput = Project( + Seq( + Alias(CurrentTime(Literal(3)), "a")(), + Alias(CurrentTime(Literal(3)), "b")() + ), + LocalRelation() + ) + + val analyzed = planInput.analyze + val optimized = Optimize.execute(analyzed).asInstanceOf[Project] + + // We expect 2 literals in the final Project. Each literal is a Long + // representing microseconds since midnight, truncated to precision=3. + val lits = literals[Long](optimized) // a helper that extracts all Literal values of type Long + assert(lits.size == 2, s"Expected two literal values, found ${lits.size}") + + // The rule should produce the same microsecond value for both columns "a" and "b". + assert(lits(0) == lits(1), + s"Expected both current_time(3) calls to yield the same literal, " + + s"but got ${lits(0)} vs ${lits(1)}") + } + + test("analyzer should replace current_time with foldable child expressions") { + // We build a plan that calls current_time(2 + 1) twice + val foldableExpr = Add(Literal(2), Literal(1)) // a foldable arithmetic expression => 3 + val planInput = Project( + Seq( + Alias(CurrentTime(foldableExpr), "a")(), + Alias(CurrentTime(foldableExpr), "b")() + ), + LocalRelation() + ) + + val analyzed = planInput.analyze + val optimized = Optimize.execute(analyzed).asInstanceOf[Project] + + // We expect the optimizer to replace current_time(2 + 1) with a literal time value, + // so let's extract those literal values. + val lits = literals[Long](optimized) + assert(lits.size == 2, s"Expected two literal values, found ${lits.size}") + + // Both references to current_time(2 + 1) should be replaced by the same microsecond-of-day + assert(lits(0) == lits(1), + s"Expected both current_time(2 + 1) calls to yield the same literal, " + + s"but got ${lits(0)} vs. ${lits(1)}" + ) + } + + test("analyzer should replace current_time with foldable casted string-literal") { + // We'll build a foldable cast expression: CAST(' 0005 ' AS INT) => 5 + val castExpr = Cast(Literal(" 0005 "), IntegerType) + + // Two references to current_time(castExpr) => so we can check they're replaced consistently + val planInput = Project( + Seq( + Alias(CurrentTime(castExpr), "a")(), + Alias(CurrentTime(castExpr), "b")() + ), + LocalRelation() + ) + + val analyzed = planInput.analyze + val optimized = Optimize.execute(analyzed).asInstanceOf[Project] + + val lits = literals[Long](optimized) + assert(lits.size == 2, s"Expected two literal values, found ${lits.size}") + + // Both references to current_time(CAST(' 0005 ' AS INT)) in the same query + // should produce the same microsecond-of-day literal. + assert(lits(0) == lits(1), + s"Expected both references to yield the same literal, but got ${lits(0)} vs. ${lits(1)}" + ) + } + + test("analyzer should respect time flow in current timestamp calls") { val in = Project(Alias(CurrentTimestamp(), "t1")() :: Nil, LocalRelation()) @@ -65,6 +143,20 @@ class ComputeCurrentTimeSuite extends PlanTest { assert(t2 - t1 <= 1000 && t2 - t1 > 0) } + test("analyzer should respect time flow in current_time calls") { + val in = Project(Alias(CurrentTime(Literal(4)), "t1")() :: Nil, LocalRelation()) + + val planT1 = Optimize.execute(in.analyze).asInstanceOf[Project] + sleep(5) + val planT2 = Optimize.execute(in.analyze).asInstanceOf[Project] + + val t1 = literals[Long](planT1)(0) // the microseconds-of-day for planT1 + val t2 = literals[Long](planT2)(0) // the microseconds-of-day for planT2 + + assert(t2 > t1, s"Expected a newer time in the second analysis, but got t1=$t1, t2=$t2") + } + + test("analyzer should replace current_date with literals") { val in = Project(Seq(Alias(CurrentDate(), "a")(), Alias(CurrentDate(), "b")()), LocalRelation()) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala index 2cae4e4fe95f..17b03946251a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala @@ -1194,16 +1194,18 @@ class ExpressionParserSuite extends AnalysisTest { } } - test("current date/timestamp braceless expressions") { + test("current date/timestamp/time braceless expressions") { withSQLConf(SQLConf.ANSI_ENABLED.key -> "true", SQLConf.ENFORCE_RESERVED_KEYWORDS.key -> "true") { assertEqual("current_date", CurrentDate()) assertEqual("current_timestamp", CurrentTimestamp()) + assertEqual("current_time", CurrentTime()) } def testNonAnsiBehavior(): Unit = { assertEqual("current_date", UnresolvedAttribute.quoted("current_date")) assertEqual("current_timestamp", UnresolvedAttribute.quoted("current_timestamp")) + assertEqual("current_time", UnresolvedAttribute.quoted("current_time")) } withSQLConf( SQLConf.ANSI_ENABLED.key -> "false", diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index 2a3f28fe496c..f2301c4526b8 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -104,6 +104,7 @@ | org.apache.spark.sql.catalyst.expressions.CurrentDatabase | current_database | SELECT current_database() | struct<current_schema():string> | | org.apache.spark.sql.catalyst.expressions.CurrentDatabase | current_schema | SELECT current_schema() | struct<current_schema():string> | | org.apache.spark.sql.catalyst.expressions.CurrentDate | current_date | SELECT current_date() | struct<current_date():date> | +| org.apache.spark.sql.catalyst.expressions.CurrentTime | current_time | SELECT current_time() | struct<current_time(6):time(6)> | | org.apache.spark.sql.catalyst.expressions.CurrentTimeZone | current_timezone | SELECT current_timezone() | struct<current_timezone():string> | | org.apache.spark.sql.catalyst.expressions.CurrentTimestamp | current_timestamp | SELECT current_timestamp() | struct<current_timestamp():timestamp> | | org.apache.spark.sql.catalyst.expressions.CurrentUser | current_user | SELECT current_user() | struct<current_user():string> | diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala index e807ae306ce7..cd54161f54ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala @@ -27,7 +27,7 @@ import org.apache.spark.ErrorMessageFormat.MINIMAL import org.apache.spark.SparkThrowableHelper.getMessage import org.apache.spark.internal.Logging import org.apache.spark.sql.IntegratedUDFTestUtils.{TestUDF, TestUDTFSet} -import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestampLike, CurrentUser, Literal} +import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTime, CurrentTimestampLike, CurrentUser, Literal} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.fileToString @@ -101,6 +101,9 @@ trait SQLQueryTestHelper extends Logging { case expr: CurrentTimestampLike => deterministic = false expr + case expr: CurrentTime => + deterministic = false + expr case expr: CurrentUser => deterministic = false expr diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala index a7af22a0554e..e90907b904bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala @@ -209,6 +209,7 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession { "org.apache.spark.sql.catalyst.expressions.CurrentTimeZone", "org.apache.spark.sql.catalyst.expressions.Now", "org.apache.spark.sql.catalyst.expressions.LocalTimestamp", + "org.apache.spark.sql.catalyst.expressions.CurrentTime", // Random output without a seed "org.apache.spark.sql.catalyst.expressions.Rand", "org.apache.spark.sql.catalyst.expressions.Randn", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org