This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 50afb735dbc9 [SPARK-51649][SQL] Dynamic writes/reads of TIME partitions
50afb735dbc9 is described below

commit 50afb735dbc9ca7a04be15f8b36469771b6b1efd
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Fri Mar 28 22:29:32 2025 +0300

    [SPARK-51649][SQL] Dynamic writes/reads of TIME partitions
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to support partition values of the TIME type. In 
particular, if the desired type is `TimeType(n)`, cast partition values by the 
`Cast` expression to the type.
    
    ### Why are the changes needed?
    To fix the failure on reading TIME partitions:
    ```scala
    scala> spark.read.schema("t TIME, id 
INT").parquet("/Users/maxim.gekk/tmp/time_parquet").show(false)
    org.apache.spark.SparkRuntimeException: [INVALID_PARTITION_VALUE] Failed to 
cast value '12%3A00%3A00' to data type "TIME(6)" for partition column `t`. 
Ensure the value matches the expected data type for this partition column. 
SQLSTATE: 42846
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By running the new test:
    ```
    $ build/sbt "test:testOnly *PartitionedWriteSuite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #50442 from MaxGekk/read-time-partition.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../execution/datasources/PartitioningUtils.scala  |  1 +
 .../spark/sql/sources/PartitionedWriteSuite.scala  | 27 ++++++++++++++++++++++
 2 files changed, 28 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 402b70065d8e..6f39636b5f5e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -534,6 +534,7 @@ object PartitioningUtils extends SQLConfHelper {
     case _: DecimalType => Literal(new JBigDecimal(value)).value
     case DateType =>
       Cast(Literal(value), DateType, Some(zoneId.getId)).eval()
+    case tt: TimeType => Cast(Literal(unescapePathName(value)), tt).eval()
     // Timestamp types
     case dt if AnyTimestampType.acceptsType(dt) =>
       Try {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
index f3849fe34ec2..b18d8f816e30 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -223,6 +223,33 @@ class PartitionedWriteSuite extends QueryTest with 
SharedSparkSession {
       }
     }
   }
+
+  test("Dynamic writes/reads of TIME partitions") {
+    Seq(
+      "00:00:00" -> TimeType(0),
+      "00:00:00.00109" -> TimeType(5),
+      "00:01:02.999999" -> TimeType(6),
+      "12:00:00" -> TimeType(1),
+      "23:59:59.000001" -> TimeType(),
+      "23:59:59.999999" -> TimeType(6)
+    ).foreach { case (timeStr, timeType) =>
+      withTempPath { f =>
+        val df = sql(s"select 0 AS id, cast('$timeStr' as ${timeType.sql}) AS 
tt")
+        assert(df.schema("tt").dataType === timeType)
+        df.write
+          .partitionBy("tt")
+          .format("parquet")
+          .save(f.getAbsolutePath)
+        val files = 
TestUtils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
+        assert(files.length == 1)
+        checkPartitionValues(files.head, timeStr)
+        val schema = new StructType()
+          .add("id", IntegerType)
+          .add("tt", timeType)
+        
checkAnswer(spark.read.schema(schema).format("parquet").load(f.getAbsolutePath),
 df)
+      }
+    }
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to