This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1de64a4f16b1 [SPARK-53032] Fix parquet format of shredded timestamp 
values within arrays
1de64a4f16b1 is described below

commit 1de64a4f16b18b718cf9e4e52e8c798135717034
Author: Harsh Motwani <harsh.motw...@databricks.com>
AuthorDate: Mon Aug 4 11:15:16 2025 +0800

    [SPARK-53032] Fix parquet format of shredded timestamp values within arrays
    
    ### What changes were proposed in this pull request?
    
    This PR is an extension of the [previous 
PR](https://github.com/apache/spark/pull/51609) which did not account for the 
fact that timestamps within Variant arrays could be shredded as well. This PR 
makes sure that these timestamps are stored in compliance to the shredding spec.
    
    ### Why are the changes needed?
    
    Variants representing arrays of timestamps could be shredded and the 
written format must reflect the parquet spec.
    
    ### Does this PR introduce _any_ user-facing change?
    
    This PR must go in the same version as the [previous 
PR](https://github.com/apache/spark/pull/51609). The physical format of 
shredded timestamps within parquet files will be different.
    
    ### How was this patch tested?
    
    Incorporated `array<timestamp>` within previous unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #51734 from harshmotw-db/harsh-motwani_data/shredding_timestamp_fix.
    
    Authored-by: Harsh Motwani <harsh.motw...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../datasources/parquet/ParquetWriteSupport.scala  | 17 +++++++++-------
 .../parquet/ParquetVariantShreddingSuite.scala     | 23 +++++++++++++++++++---
 2 files changed, 30 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 240551db8c7d..2ab9fb64da43 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -308,9 +308,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] 
with Logging {
             writeFields(row.getStruct(ordinal, t.length), t, fieldWriters)
           }
 
-      case t: ArrayType => makeArrayWriter(t)
+      case t: ArrayType => makeArrayWriter(t, inShredded)
 
-      case t: MapType => makeMapWriter(t)
+      case t: MapType => makeMapWriter(t, inShredded)
 
       case t: UserDefinedType[_] => makeWriter(t.sqlType, inShredded)
 
@@ -391,9 +391,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] 
with Logging {
     }
   }
 
-  def makeArrayWriter(arrayType: ArrayType): ValueWriter = {
+  def makeArrayWriter(arrayType: ArrayType, inShredded: Boolean): ValueWriter 
= {
     // The shredded schema should not have an array inside
-    val elementWriter = makeWriter(arrayType.elementType, inShredded = false)
+    val elementWriter = makeWriter(arrayType.elementType, inShredded)
 
     def threeLevelArrayWriter(repeatedGroupName: String, elementFieldName: 
String): ValueWriter =
       (row: SpecializedGetters, ordinal: Int) => {
@@ -472,9 +472,12 @@ class ParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
     }
   }
 
-  private def makeMapWriter(mapType: MapType): ValueWriter = {
-    val keyWriter = makeWriter(mapType.keyType, inShredded = false)
-    val valueWriter = makeWriter(mapType.valueType, inShredded = false)
+  private def makeMapWriter(mapType: MapType, inShredded: Boolean): 
ValueWriter = {
+    // TODO: If maps are ever supported in the shredded schema, we should add 
a test in
+    //  `ParquetVariantShreddingSuite` to make sure that timestamps within 
maps are shredded
+    //  correctly as INT64.
+    val keyWriter = makeWriter(mapType.keyType, inShredded)
+    val valueWriter = makeWriter(mapType.valueType, inShredded)
     val repeatedGroupName = if (writeLegacyParquetFormat) {
       // Legacy mode:
       //
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
index 245c7beba300..c41d88ac552d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
@@ -47,17 +47,21 @@ class ParquetVariantShreddingSuite extends QueryTest with 
ParquetTest with Share
     ParquetOutputTimestampType.values.foreach { timestampParquetType =>
       withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> 
timestampParquetType.toString) {
         withTempDir { dir =>
-          val schema = "t timestamp, st struct<t timestamp>"
+          val schema = "t timestamp, st struct<t timestamp>, at 
array<timestamp>"
           val fullSchema = "v struct<metadata binary, value binary, 
typed_value struct<" +
             "t struct<value binary, typed_value timestamp>," +
             "st struct<" +
-            "value binary, typed_value struct<t struct<value binary, 
typed_value timestamp>>>>>, " +
+            "value binary, typed_value struct<t struct<value binary, 
typed_value timestamp>>>," +
+            "at struct<" +
+              "value binary, typed_value array<struct<value binary, 
typed_value timestamp>>>" +
+            ">>, " +
             "t1 timestamp, st1 struct<t1 timestamp>"
           val df = spark.sql(
             """
               | select
               |   to_variant_object(
-              |     named_struct('t', 1::timestamp, 'st', named_struct('t', 
2::timestamp))
+              |     named_struct('t', 1::timestamp, 'st', named_struct('t', 
2::timestamp),
+              |     'at', array(5::timestamp))
               |   ) v, 3::timestamp t1, named_struct('t1', 4::timestamp) st1
               | from range(1)
               |""".stripMargin)
@@ -82,6 +86,9 @@ class ParquetVariantShreddingSuite extends QueryTest with 
ParquetTest with Share
             checkAnswer(
               shreddedDf.selectExpr("st1.t1::long"),
               Seq(Row(4)))
+            checkAnswer(
+              
shreddedDf.selectExpr("v.typed_value.at.typed_value[0].typed_value::long"),
+              Seq(Row(5)))
             val file = dir.listFiles().find(_.getName.endsWith(".parquet")).get
             val parquetFilePath = file.getAbsolutePath
             val inputFile = HadoopInputFile.fromPath(new 
Path(parquetFilePath), new Configuration())
@@ -106,6 +113,16 @@ class ParquetVariantShreddingSuite extends QueryTest with 
ParquetTest with Share
             assert(typedValue2.getLogicalTypeAnnotation == 
LogicalTypeAnnotation.timestampType(
               true, LogicalTypeAnnotation.TimeUnit.MICROS))
 
+            // v.typed_value.at.typed_value[0].typed_value
+            val atGroup = typedValueGroup.getType("at").asGroupType()
+            val atTypedValueGroup = 
atGroup.getType("typed_value").asGroupType()
+            val atLGroup = atTypedValueGroup.getType("list").asGroupType()
+            val atLEGroup = atLGroup.getType("element").asGroupType()
+            val typedValue3 = 
atLEGroup.getType("typed_value").asPrimitiveType()
+            assert(typedValue3.getPrimitiveTypeName == PrimitiveTypeName.INT64)
+            assert(typedValue3.getLogicalTypeAnnotation == 
LogicalTypeAnnotation.timestampType(
+              true, LogicalTypeAnnotation.TimeUnit.MICROS))
+
             def verifyNonVariantTimestampType(t: PrimitiveType): Unit = {
               timestampParquetType match {
                 case ParquetOutputTimestampType.INT96 =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to