This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ea8b6fdde977 [SPARK-53107][SQL] Implement the time_trunc function in Scala ea8b6fdde977 is described below commit ea8b6fdde9772b73aabca053d403e8d679e68beb Author: Uros Bojanic <uros.boja...@databricks.com> AuthorDate: Wed Aug 6 18:29:30 2025 +0800 [SPARK-53107][SQL] Implement the time_trunc function in Scala ### What changes were proposed in this pull request? Implement the `time_trunc` function in Scala API. ### Why are the changes needed? Expand API support for the `TimeTrunc` expression. ### Does this PR introduce _any_ user-facing change? Yes, the new function is now available in Scala API. ### How was this patch tested? Added appropriate Scala function tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51823 from uros-db/scala-time_trunc. Authored-by: Uros Bojanic <uros.boja...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- python/pyspark/sql/tests/test_functions.py | 5 +- .../scala/org/apache/spark/sql/functions.scala | 21 ++++++++ .../apache/spark/sql/TimeFunctionsSuiteBase.scala | 56 +++++++++++++++++++++- 3 files changed, 80 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index 99ab1fed0f4d..ca6c2233ef87 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -81,7 +81,10 @@ class FunctionsTestsMixin: missing_in_py = jvm_fn_set.difference(py_fn_set) # Functions that we expect to be missing in python until they are added to pyspark - expected_missing_in_py = set() + expected_missing_in_py = set( + # TODO(SPARK-53107): Implement the time_trunc function in Python + ["time_trunc"] + ) self.assertEqual( expected_missing_in_py, missing_in_py, "Missing functions in pyspark not as expected" diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala index 4d8f658ca32d..49fa45ed02cb 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala @@ -6292,6 +6292,27 @@ object functions { def timestamp_add(unit: String, quantity: Column, ts: Column): Column = Column.internalFn("timestampadd", lit(unit), quantity, ts) + /** + * Returns `time` truncated to the `unit`. + * + * @param unit + * A STRING representing the unit to truncate the time to. Supported units are: "HOUR", + * "MINUTE", "SECOND", "MILLISECOND", and "MICROSECOND". The unit is case-insensitive. + * @param time + * A TIME to truncate. + * @return + * A TIME truncated to the specified unit. + * @note + * If any of the inputs is `NULL`, the result is `NULL`. + * @throws IllegalArgumentException + * If the `unit` is not supported. + * @group datetime_funcs + * @since 4.1.0 + */ + def time_trunc(unit: Column, time: Column): Column = { + Column.fn("time_trunc", unit, time) + } + /** * Parses the `timestamp` expression with the `format` expression to a timestamp without time * zone. Returns null with invalid input. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala index 8506ab4527c9..005bfcb13d2e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import java.time.LocalTime import java.time.temporal.ChronoUnit -import org.apache.spark.{SparkConf, SparkDateTimeException} +import org.apache.spark.{SparkConf, SparkDateTimeException, SparkIllegalArgumentException} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -241,6 +241,60 @@ abstract class TimeFunctionsSuiteBase extends QueryTest with SharedSparkSession checkAnswer(result2, expected) } + test("SPARK-53107: time_trunc function") { + // Input data for the function (including null values). + val schema = StructType(Seq( + StructField("unit", StringType), + StructField("time", TimeType()) + )) + val data = Seq( + Row("HOUR", LocalTime.parse("00:00:00")), + Row("second", LocalTime.parse("01:02:03.4")), + Row("MicroSecond", LocalTime.parse("23:59:59.999999")), + Row(null, LocalTime.parse("01:02:03")), + Row("MiNuTe", null), + Row(null, null) + ) + val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + + // Test the function using both `selectExpr` and `select`. + val result1 = df.selectExpr( + "time_trunc(unit, time)" + ) + val result2 = df.select( + time_trunc(col("unit"), col("time")) + ) + // Check that both methods produce the same result. + checkAnswer(result1, result2) + + // Expected output of the function. + val expected = Seq( + "00:00:00", + "01:02:03", + "23:59:59.999999", + null, + null, + null + ).toDF("timeString").select(col("timeString").cast("time")) + // Check that the results match the expected output. + checkAnswer(result1, expected) + checkAnswer(result2, expected) + + // Error is thrown for malformed input. + val invalidUnitDF = Seq(("invalid_unit", LocalTime.parse("01:02:03"))).toDF("unit", "time") + checkError( + exception = intercept[SparkIllegalArgumentException] { + invalidUnitDF.select(time_trunc(col("unit"), col("time"))).collect() + }, + condition = "INVALID_PARAMETER_VALUE.TIME_UNIT", + parameters = Map( + "functionName" -> "`time_trunc`", + "parameter" -> "`unit`", + "invalidValue" -> "'invalid_unit'" + ) + ) + } + test("SPARK-52883: to_time function without format") { // Input data for the function. val schema = StructType(Seq( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org