This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c8c8c9a22edc [SPARK-51661][SQL] Partitions discovery of TIME column 
values
c8c8c9a22edc is described below

commit c8c8c9a22edc3ae81ffdb8ebf35e8a677a0a4cca
Author: Max Gekk <[email protected]>
AuthorDate: Mon Mar 31 08:19:53 2025 +0300

    [SPARK-51661][SQL] Partitions discovery of TIME column values
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to infer the `TIME` data type from partition values 
that match to the pattern `HH:mm:ss[.SSSSSS]`. The second fraction part has 
variable length namely the following values match to the pattern: 
`01:02:03.001`, `23:59:59` and `12:13:14.123456`.
    
    ### Why are the changes needed?
    Currently, Spark can save a dataset partitioned by a TIME column, and read 
it back if an user set a schema explicitly, but it cannot infer the TIME data 
type of the column automatically. For example:
    ```scala
    scala> sql("SELECT time'12:00' AS t, 0 as 
id").write.partitionBy("t").parquet("/Users/maxim.gekk/tmp/time_parquet2")
    scala> 
spark.read.parquet("/Users/maxim.gekk/tmp/time_parquet2").printSchema()
    root
     |-- id: integer (nullable = true)
     |-- t: string (nullable = true)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. After the changes, the inferred type is `TIME(6)` instead of `STRING` 
for the example above:
    ```scala
    scala> 
spark.read.parquet("/Users/maxim.gekk/tmp/time_parquet2").printSchema()
    root
     |-- id: integer (nullable = true)
     |-- t: time(6) (nullable = true)
    ```
    
    ### How was this patch tested?
    By running new test:
    ```
    $ build/sbt "test:testOnly *ParquetV1PartitionDiscoverySuite"
    $ build/sbt "test:testOnly *ParquetV2PartitionDiscoverySuite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #50453 from MaxGekk/time-partitions-discovery.
    
    Authored-by: Max Gekk <[email protected]>
    Signed-off-by: Max Gekk <[email protected]>
---
 .../sql/catalyst/analysis/AnsiTypeCoercion.scala   |  2 +-
 .../spark/sql/catalyst/analysis/TypeCoercion.scala |  2 +-
 .../sql/catalyst/analysis/TypeCoercionHelper.scala | 13 +++--
 .../execution/datasources/PartitioningUtils.scala  | 62 +++++++++++++-------
 .../parquet/ParquetPartitionDiscoverySuite.scala   | 68 +++++++++++++++++++---
 5 files changed, 113 insertions(+), 34 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala
index aa977b240007..a98aaf702ace 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala
@@ -122,7 +122,7 @@ object AnsiTypeCoercion extends TypeCoercionBase {
         Some(widerType)
       }
 
-    case (d1: DatetimeType, d2: DatetimeType) => 
Some(findWiderDateTimeType(d1, d2))
+    case (d1: DatetimeType, d2: DatetimeType) => findWiderDateTimeType(d1, d2)
 
     case (t1: DayTimeIntervalType, t2: DayTimeIntervalType) =>
       Some(DayTimeIntervalType(t1.startField.min(t2.startField), 
t1.endField.max(t2.endField)))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 4769970b5142..f68084803fe7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -90,7 +90,7 @@ object TypeCoercion extends TypeCoercionBase {
         val index = numericPrecedence.lastIndexWhere(t => t == t1 || t == t2)
         Some(numericPrecedence(index))
 
-      case (d1: DatetimeType, d2: DatetimeType) => 
Some(findWiderDateTimeType(d1, d2))
+      case (d1: DatetimeType, d2: DatetimeType) => findWiderDateTimeType(d1, 
d2)
 
       case (t1: DayTimeIntervalType, t2: DayTimeIntervalType) =>
         Some(DayTimeIntervalType(t1.startField.min(t2.startField), 
t1.endField.max(t2.endField)))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionHelper.scala
index b11ec6d8fe67..390ff2f3114d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionHelper.scala
@@ -84,7 +84,8 @@ import org.apache.spark.sql.types.{
   StructType,
   TimestampNTZType,
   TimestampType,
-  TimestampTypeExpression
+  TimestampTypeExpression,
+  TimeType
 }
 
 abstract class TypeCoercionHelper {
@@ -239,16 +240,18 @@ abstract class TypeCoercionHelper {
     }
   }
 
-  protected def findWiderDateTimeType(d1: DatetimeType, d2: DatetimeType): 
DatetimeType =
+  protected def findWiderDateTimeType(d1: DatetimeType, d2: DatetimeType): 
Option[DatetimeType] =
     (d1, d2) match {
+      case (_, _: TimeType) => None
+      case (_: TimeType, _) => None
       case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) =>
-        TimestampType
+        Some(TimestampType)
 
       case (_: TimestampType, _: TimestampNTZType) | (_: TimestampNTZType, _: 
TimestampType) =>
-        TimestampType
+        Some(TimestampType)
 
       case (_: TimestampNTZType, _: DateType) | (_: DateType, _: 
TimestampNTZType) =>
-        TimestampNTZType
+        Some(TimestampNTZType)
     }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 6f39636b5f5e..1bc4645dfc43 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -36,7 +36,7 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.getPartitionValueString
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, 
DateTimeUtils, TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, 
DateTimeUtils, TimeFormatter, TimestampFormatter}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
@@ -66,6 +66,7 @@ object PartitionSpec {
 
 object PartitioningUtils extends SQLConfHelper {
 
+  val timePartitionPattern = "HH:mm:ss[.SSSSSS]"
   val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]"
 
   case class TypedPartValue(value: String, dataType: DataType)
@@ -145,10 +146,11 @@ object PartitioningUtils extends SQLConfHelper {
       timestampPartitionPattern,
       zoneId,
       isParsing = true)
+    val timeFormatter = TimeFormatter(timePartitionPattern, isParsing = true)
     // First, we need to parse every partition's path and see if we can find 
partition values.
     val (partitionValues, optDiscoveredBasePaths) = paths.map { path =>
       parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes,
-        validatePartitionColumns, zoneId, dateFormatter, timestampFormatter)
+        validatePartitionColumns, zoneId, dateFormatter, timestampFormatter, 
timeFormatter)
     }.unzip
 
     // We create pairs of (path -> path's partition value) here
@@ -240,7 +242,8 @@ object PartitioningUtils extends SQLConfHelper {
       validatePartitionColumns: Boolean,
       zoneId: ZoneId,
       dateFormatter: DateFormatter,
-      timestampFormatter: TimestampFormatter): (Option[PartitionValues], 
Option[Path]) = {
+      timestampFormatter: TimestampFormatter,
+      timeFormatter: TimeFormatter): (Option[PartitionValues], Option[Path]) = 
{
     val columns = ArrayBuffer.empty[(String, TypedPartValue)]
     // Old Hadoop versions don't have `Path.isRoot`
     var finished = path.getParent == null
@@ -262,7 +265,7 @@ object PartitioningUtils extends SQLConfHelper {
         // Once we get the string, we try to parse it and find the partition 
column and value.
         val maybeColumn =
           parsePartitionColumn(currentPath.getName, typeInference, 
userSpecifiedDataTypes,
-            zoneId, dateFormatter, timestampFormatter)
+            zoneId, dateFormatter, timestampFormatter, timeFormatter)
         maybeColumn.foreach(columns += _)
 
         // Now, we determine if we should stop.
@@ -298,7 +301,8 @@ object PartitioningUtils extends SQLConfHelper {
       userSpecifiedDataTypes: Map[String, DataType],
       zoneId: ZoneId,
       dateFormatter: DateFormatter,
-      timestampFormatter: TimestampFormatter): Option[(String, 
TypedPartValue)] = {
+      timestampFormatter: TimestampFormatter,
+      timeFormatter: TimeFormatter): Option[(String, TypedPartValue)] = {
     val equalSignIndex = columnSpec.indexOf('=')
     if (equalSignIndex == -1) {
       None
@@ -319,7 +323,8 @@ object PartitioningUtils extends SQLConfHelper {
           typeInference,
           zoneId,
           dateFormatter,
-          timestampFormatter)
+          timestampFormatter,
+          timeFormatter)
       }
       Some(columnName -> TypedPartValue(rawColumnValue, dataType))
     }
@@ -427,22 +432,23 @@ object PartitioningUtils extends SQLConfHelper {
   /**
    * Converts a string to a [[Literal]] with automatic type inference. 
Currently only supports
    * [[NullType]], [[IntegerType]], [[LongType]], [[DoubleType]], 
[[DecimalType]], [[DateType]]
-   * [[TimestampType]], and [[StringType]].
+   * [[TimestampType]], [[TimeType]] and [[StringType]].
    *
    * When resolving conflicts, it follows the table below:
    *
-   * 
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+
-   * | InputA \ InputB    | NullType          | IntegerType       | LongType   
       | DecimalType(38,0)* | DoubleType | DateType      | TimestampType | 
StringType |
-   * 
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+
-   * | NullType           | NullType          | IntegerType       | LongType   
       | DecimalType(38,0)  | DoubleType | DateType      | TimestampType | 
StringType |
-   * | IntegerType        | IntegerType       | IntegerType       | LongType   
       | DecimalType(38,0)  | DoubleType | StringType    | StringType    | 
StringType |
-   * | LongType           | LongType          | LongType          | LongType   
       | DecimalType(38,0)  | StringType | StringType    | StringType    | 
StringType |
-   * | DecimalType(38,0)* | DecimalType(38,0) | DecimalType(38,0) | 
DecimalType(38,0) | DecimalType(38,0)  | StringType | StringType    | 
StringType    | StringType |
-   * | DoubleType         | DoubleType        | DoubleType        | StringType 
       | StringType         | DoubleType | StringType    | StringType    | 
StringType |
-   * | DateType           | DateType          | StringType        | StringType 
       | StringType         | StringType | DateType      | TimestampType | 
StringType |
-   * | TimestampType      | TimestampType     | StringType        | StringType 
       | StringType         | StringType | TimestampType | TimestampType | 
StringType |
-   * | StringType         | StringType        | StringType        | StringType 
       | StringType         | StringType | StringType    | StringType    | 
StringType |
-   * 
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+
+   * 
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+------------+
+   * | InputA \ InputB    | NullType          | IntegerType       | LongType   
       | DecimalType(38,0)* | DoubleType | DateType      | TimestampType | 
StringType | TimeType   |
+   * 
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+------------+
+   * | NullType           | NullType          | IntegerType       | LongType   
       | DecimalType(38,0)  | DoubleType | DateType      | TimestampType | 
StringType | TimeType   |
+   * | IntegerType        | IntegerType       | IntegerType       | LongType   
       | DecimalType(38,0)  | DoubleType | StringType    | StringType    | 
StringType | StringType |
+   * | LongType           | LongType          | LongType          | LongType   
       | DecimalType(38,0)  | StringType | StringType    | StringType    | 
StringType | StringType |
+   * | DecimalType(38,0)* | DecimalType(38,0) | DecimalType(38,0) | 
DecimalType(38,0) | DecimalType(38,0)  | StringType | StringType    | 
StringType    | StringType | StringType |
+   * | DoubleType         | DoubleType        | DoubleType        | StringType 
       | StringType         | DoubleType | StringType    | StringType    | 
StringType | StringType |
+   * | DateType           | DateType          | StringType        | StringType 
       | StringType         | StringType | DateType      | TimestampType | 
StringType | StringType |
+   * | TimeType           | TimeType          | StringType        | StringType 
       | StringType         | StringType | StringType    | StringType    | 
StringType | TimeType   |
+   * | TimestampType      | TimestampType     | StringType        | StringType 
       | StringType         | StringType | TimestampType | TimestampType | 
StringType | StringType |
+   * | StringType         | StringType        | StringType        | StringType 
       | StringType         | StringType | StringType    | StringType    | 
StringType | StringType |
+   * 
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+------------+
    * Note that, for DecimalType(38,0)*, the table above intentionally does not 
cover all other
    * combinations of scales and precisions because currently we only infer 
decimal type like
    * `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type.
@@ -453,7 +459,8 @@ object PartitioningUtils extends SQLConfHelper {
       typeInference: Boolean,
       zoneId: ZoneId,
       dateFormatter: DateFormatter,
-      timestampFormatter: TimestampFormatter): DataType = {
+      timestampFormatter: TimestampFormatter,
+      timeFormatter: TimeFormatter): DataType = {
     val decimalTry = Try {
       // `BigDecimal` conversion can fail when the `field` is not a form of 
number.
       val bigDecimal = new JBigDecimal(raw)
@@ -499,6 +506,20 @@ object PartitioningUtils extends SQLConfHelper {
       timestampType
     }
 
+    val timeTry = Try {
+      val unescapedRaw = unescapePathName(raw)
+      // try and parse the time, if no exception occurs this is a candidate to 
be resolved as
+      // TimeType
+      timeFormatter.parse(unescapedRaw)
+      // We need to check that we can cast the raw string since we later can 
use Cast to get
+      // the partition values with the right DataType (see
+      // 
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning)
+      val timeValue = Cast(Literal(unescapedRaw), TimeType(), 
Some(zoneId.getId)).eval()
+      // Disallow TimeType if the cast returned null
+      require(timeValue != null)
+      TimeType()
+    }
+
     if (typeInference) {
       // First tries integral types
       Try({ Integer.parseInt(raw); IntegerType })
@@ -509,6 +530,7 @@ object PartitioningUtils extends SQLConfHelper {
         // Then falls back to date/timestamp types
         .orElse(timestampTry)
         .orElse(dateTry)
+        .orElse(timeTry)
         // Then falls back to string
         .getOrElse {
           if (raw == DEFAULT_PARTITION_NAME) NullType else StringType
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 206fe0b79120..6f5855461fcc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 import java.io.File
 import java.math.BigInteger
 import java.sql.Timestamp
-import java.time.{LocalDateTime, ZoneId, ZoneOffset}
+import java.time.{LocalDateTime, LocalTime, ZoneId, ZoneOffset}
 import java.util.Locale
 
 import com.google.common.io.Files
@@ -32,7 +32,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, 
TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, 
TimeFormatter, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.localDateTimeToMicros
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition}
@@ -60,6 +60,7 @@ abstract class ParquetPartitionDiscoverySuite
   val df = DateFormatter()
   val tf = TimestampFormatter(
     timestampPartitionPattern, timeZoneId, isParsing = true)
+  val tif = TimeFormatter(timePartitionPattern, isParsing = true)
 
   protected override def beforeAll(): Unit = {
     super.beforeAll()
@@ -73,7 +74,7 @@ abstract class ParquetPartitionDiscoverySuite
 
   test("column type inference") {
     def check(raw: String, dataType: DataType, zoneId: ZoneId = timeZoneId): 
Unit = {
-      assert(inferPartitionColumnValue(raw, true, zoneId, df, tf) === dataType)
+      assert(inferPartitionColumnValue(raw, true, zoneId, df, tf, tif) === 
dataType)
     }
 
     check("10", IntegerType)
@@ -214,13 +215,14 @@ abstract class ParquetPartitionDiscoverySuite
   test("parse partition") {
     def check(path: String, expected: Option[PartitionValues]): Unit = {
       val actual = parsePartition(new Path(path), true, Set.empty[Path],
-        Map.empty, true, timeZoneId, df, tf)._1
+        Map.empty, true, timeZoneId, df, tf, tif)._1
       assert(expected === actual)
     }
 
     def checkThrows[T <: Throwable: Manifest](path: String, expected: String): 
Unit = {
       val message = intercept[T] {
-        parsePartition(new Path(path), true, Set.empty[Path], Map.empty, true, 
timeZoneId, df, tf)
+        parsePartition(new Path(path), true, Set.empty[Path], Map.empty, true, 
timeZoneId,
+          df, tf, tif)
       }.getMessage
 
       assert(message.contains(expected))
@@ -268,7 +270,8 @@ abstract class ParquetPartitionDiscoverySuite
       true,
       zoneId = timeZoneId,
       df,
-      tf)._1
+      tf,
+      tif)._1
 
     assert(partitionSpec1.isEmpty)
 
@@ -281,7 +284,8 @@ abstract class ParquetPartitionDiscoverySuite
       true,
       zoneId = timeZoneId,
       df,
-      tf)._1
+      tf,
+      tif)._1
 
     assert(partitionSpec2 ==
       Option(PartitionValues(
@@ -1187,6 +1191,56 @@ abstract class ParquetPartitionDiscoverySuite
       assert("Partition column name list #[0-1]: col1, 
col2".r.findFirstIn(msg).isDefined)
     }
   }
+
+  test("Infer the TIME data type from partition values") {
+    val df = Seq(
+      (1, LocalTime.parse("00:00:00")),
+      (2, LocalTime.parse("23:00:00.9")),
+      (3, LocalTime.parse("10:11:12.001")),
+      (4, LocalTime.parse("23:59:59.999999"))
+    ).toDF("i", "time")
+
+    withTempPath { path =>
+      df.write.format("parquet").partitionBy("time").save(path.getAbsolutePath)
+      checkAnswer(spark.read.load(path.getAbsolutePath), df)
+    }
+  }
+
+  test("Invalid times should be inferred as STRING in partition inference") {
+    withTempPath { path =>
+      val data = Seq(("1", "10:61", "12:13:14:04", "T00:01:02", "test"))
+        .toDF("id", "time_min", "time_sec", "time_prefix", "data")
+
+      data.write.partitionBy("time_min", "time_sec", 
"time_prefix").parquet(path.getAbsolutePath)
+      val input = spark.read.parquet(path.getAbsolutePath).select("id",
+        "time_min", "time_sec", "time_prefix", "data")
+
+      assert(DataTypeUtils.sameType(data.schema, input.schema))
+      checkAnswer(input, data)
+    }
+  }
+
+  test("Resolve type conflicts between strings and time in partition column") {
+    val df = Seq(
+      (1, "00:00:00"),
+      (2, "23:59:00.001"),
+      (3, "blah")).toDF("i", "str")
+
+    withTempPath { path =>
+      df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath)
+      checkAnswer(spark.read.load(path.getAbsolutePath), df)
+    }
+  }
+
+  test("Resolve type conflicts between times and timestamps in partition 
column") {
+    withTempPath { path =>
+      val df = Seq((1, "23:59:59"), (2, "00:01:00"), (3, "2015-01-01 
00:01:00")).toDF("i", "ts")
+      df.write.format("parquet").partitionBy("ts").save(path.getAbsolutePath)
+      checkAnswer(
+        spark.read.load(path.getAbsolutePath),
+        Row(1, "23:59:59") :: Row(2, "00:01:00") :: Row(3, "2015-01-01 
00:01:00") :: Nil)
+    }
+  }
 }
 
 class ParquetV1PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to