This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1de64a4f16b1 [SPARK-53032] Fix parquet format of shredded timestamp values within arrays 1de64a4f16b1 is described below commit 1de64a4f16b18b718cf9e4e52e8c798135717034 Author: Harsh Motwani <harsh.motw...@databricks.com> AuthorDate: Mon Aug 4 11:15:16 2025 +0800 [SPARK-53032] Fix parquet format of shredded timestamp values within arrays ### What changes were proposed in this pull request? This PR is an extension of the [previous PR](https://github.com/apache/spark/pull/51609) which did not account for the fact that timestamps within Variant arrays could be shredded as well. This PR makes sure that these timestamps are stored in compliance to the shredding spec. ### Why are the changes needed? Variants representing arrays of timestamps could be shredded and the written format must reflect the parquet spec. ### Does this PR introduce _any_ user-facing change? This PR must go in the same version as the [previous PR](https://github.com/apache/spark/pull/51609). The physical format of shredded timestamps within parquet files will be different. ### How was this patch tested? Incorporated `array<timestamp>` within previous unit test. ### Was this patch authored or co-authored using generative AI tooling? no Closes #51734 from harshmotw-db/harsh-motwani_data/shredding_timestamp_fix. Authored-by: Harsh Motwani <harsh.motw...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../datasources/parquet/ParquetWriteSupport.scala | 17 +++++++++------- .../parquet/ParquetVariantShreddingSuite.scala | 23 +++++++++++++++++++--- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 240551db8c7d..2ab9fb64da43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -308,9 +308,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { writeFields(row.getStruct(ordinal, t.length), t, fieldWriters) } - case t: ArrayType => makeArrayWriter(t) + case t: ArrayType => makeArrayWriter(t, inShredded) - case t: MapType => makeMapWriter(t) + case t: MapType => makeMapWriter(t, inShredded) case t: UserDefinedType[_] => makeWriter(t.sqlType, inShredded) @@ -391,9 +391,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { } } - def makeArrayWriter(arrayType: ArrayType): ValueWriter = { + def makeArrayWriter(arrayType: ArrayType, inShredded: Boolean): ValueWriter = { // The shredded schema should not have an array inside - val elementWriter = makeWriter(arrayType.elementType, inShredded = false) + val elementWriter = makeWriter(arrayType.elementType, inShredded) def threeLevelArrayWriter(repeatedGroupName: String, elementFieldName: String): ValueWriter = (row: SpecializedGetters, ordinal: Int) => { @@ -472,9 +472,12 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { } } - private def makeMapWriter(mapType: MapType): ValueWriter = { - val keyWriter = makeWriter(mapType.keyType, inShredded = false) - val valueWriter = makeWriter(mapType.valueType, inShredded = false) + private def makeMapWriter(mapType: MapType, inShredded: Boolean): ValueWriter = { + // TODO: If maps are ever supported in the shredded schema, we should add a test in + // `ParquetVariantShreddingSuite` to make sure that timestamps within maps are shredded + // correctly as INT64. + val keyWriter = makeWriter(mapType.keyType, inShredded) + val valueWriter = makeWriter(mapType.valueType, inShredded) val repeatedGroupName = if (writeLegacyParquetFormat) { // Legacy mode: // diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala index 245c7beba300..c41d88ac552d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala @@ -47,17 +47,21 @@ class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with Share ParquetOutputTimestampType.values.foreach { timestampParquetType => withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> timestampParquetType.toString) { withTempDir { dir => - val schema = "t timestamp, st struct<t timestamp>" + val schema = "t timestamp, st struct<t timestamp>, at array<timestamp>" val fullSchema = "v struct<metadata binary, value binary, typed_value struct<" + "t struct<value binary, typed_value timestamp>," + "st struct<" + - "value binary, typed_value struct<t struct<value binary, typed_value timestamp>>>>>, " + + "value binary, typed_value struct<t struct<value binary, typed_value timestamp>>>," + + "at struct<" + + "value binary, typed_value array<struct<value binary, typed_value timestamp>>>" + + ">>, " + "t1 timestamp, st1 struct<t1 timestamp>" val df = spark.sql( """ | select | to_variant_object( - | named_struct('t', 1::timestamp, 'st', named_struct('t', 2::timestamp)) + | named_struct('t', 1::timestamp, 'st', named_struct('t', 2::timestamp), + | 'at', array(5::timestamp)) | ) v, 3::timestamp t1, named_struct('t1', 4::timestamp) st1 | from range(1) |""".stripMargin) @@ -82,6 +86,9 @@ class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with Share checkAnswer( shreddedDf.selectExpr("st1.t1::long"), Seq(Row(4))) + checkAnswer( + shreddedDf.selectExpr("v.typed_value.at.typed_value[0].typed_value::long"), + Seq(Row(5))) val file = dir.listFiles().find(_.getName.endsWith(".parquet")).get val parquetFilePath = file.getAbsolutePath val inputFile = HadoopInputFile.fromPath(new Path(parquetFilePath), new Configuration()) @@ -106,6 +113,16 @@ class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with Share assert(typedValue2.getLogicalTypeAnnotation == LogicalTypeAnnotation.timestampType( true, LogicalTypeAnnotation.TimeUnit.MICROS)) + // v.typed_value.at.typed_value[0].typed_value + val atGroup = typedValueGroup.getType("at").asGroupType() + val atTypedValueGroup = atGroup.getType("typed_value").asGroupType() + val atLGroup = atTypedValueGroup.getType("list").asGroupType() + val atLEGroup = atLGroup.getType("element").asGroupType() + val typedValue3 = atLEGroup.getType("typed_value").asPrimitiveType() + assert(typedValue3.getPrimitiveTypeName == PrimitiveTypeName.INT64) + assert(typedValue3.getLogicalTypeAnnotation == LogicalTypeAnnotation.timestampType( + true, LogicalTypeAnnotation.TimeUnit.MICROS)) + def verifyNonVariantTimestampType(t: PrimitiveType): Unit = { timestampParquetType match { case ParquetOutputTimestampType.INT96 => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org