This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new ecfbaa89166f [SPARK-50614][FOLLOW-UP] Fix bug where shredded timestamp values did not conform to the Parquet Variant Shredding spec ecfbaa89166f is described below commit ecfbaa89166f799ee252ef8c7aec83fe7a78c977 Author: Harsh Motwani <harsh.motw...@databricks.com> AuthorDate: Tue Jul 22 20:50:37 2025 +0800 [SPARK-50614][FOLLOW-UP] Fix bug where shredded timestamp values did not conform to the Parquet Variant Shredding spec ### What changes were proposed in this pull request? This PR makes sure that shredded timestamp values are always written in INT64 Microsecond configuration. ### Why are the changes needed? The Parquet spec for [Variant Shredding](https://github.com/apache/parquet-format/blob/37b6e8b863fb510314c07649665251f6474b0c11/VariantShredding.md) (not confirmed yet) states that timestamp values must always be stored in the INT64 Physical type. The Spark timestamp type always has microsecond granularity, so we annotate this primitive type as a microsecond timestamp. ### Does this PR introduce _any_ user-facing change? Not that I can think of. But, the physical Primitive type of shredded timestamp values is going to be different. ### How was this patch tested? Unit tests showing that: 1. Top level and nested shredded timestamp values are represented correctly regardless of SQLConf. 2. Top level and nested timestamp values that aren't a part of a shredded variant are represented in the Parquet file based on the SQLConf. ### Was this patch authored or co-authored using generative AI tooling? Yes, a little bit of copilot Closes #51609 from harshmotw-db/harsh-motwani_data/shredding_timestamp_fix. Authored-by: Harsh Motwani <harsh.motw...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 1c5908e84639ad72ce08d169df5362f1aa468e44) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../datasources/parquet/ParquetReadSupport.scala | 6 +- .../parquet/ParquetSchemaConverter.scala | 69 ++++++++------ .../datasources/parquet/ParquetWriteSupport.scala | 69 ++++++++------ .../sql/errors/QueryCompilationErrorsSuite.scala | 2 +- .../parquet/ParquetVariantShreddingSuite.scala | 106 +++++++++++++++++++++ 5 files changed, 194 insertions(+), 58 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index af0bf0d51f07..09fd0eccec4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -423,7 +423,7 @@ object ParquetReadSupport extends Logging { caseSensitiveParquetFieldMap .get(f.name) .map(clipParquetType(_, f.dataType, caseSensitive, useFieldId)) - .getOrElse(toParquet.convertField(f)) + .getOrElse(toParquet.convertField(f, inShredded = false)) } def matchCaseInsensitiveField(f: StructField): Type = { @@ -439,7 +439,7 @@ object ParquetReadSupport extends Logging { } else { clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId) } - }.getOrElse(toParquet.convertField(f)) + }.getOrElse(toParquet.convertField(f, inShredded = false)) } def matchIdField(f: StructField): Type = { @@ -458,7 +458,7 @@ object ParquetReadSupport extends Logging { }.getOrElse { // When there is no ID match, we use a fake name to avoid a name match by accident // We need this name to be unique as well, otherwise there will be type conflicts - toParquet.convertField(f.copy(name = generateFakeColumnName)) + toParquet.convertField(f.copy(name = generateFakeColumnName), inShredded = false) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index fcc8e76f73ec..a9b7e2f4dc9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -546,15 +546,16 @@ class SparkToParquetSchemaConverter( def convert(catalystSchema: StructType): MessageType = { Types .buildMessage() - .addFields(catalystSchema.map(convertField): _*) + .addFields(catalystSchema.map(f => convertField(f, inShredded = false)): _*) .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) } /** - * Converts a Spark SQL [[StructField]] to a Parquet [[Type]]. + * Converts a Spark SQL [[StructField]] to a Parquet [[Type]]. `inShredded` indicates whether + * the field is within a shredded Variant schema. */ - def convertField(field: StructField): Type = { - val converted = convertField(field, if (field.nullable) OPTIONAL else REQUIRED) + def convertField(field: StructField, inShredded: Boolean): Type = { + val converted = convertField(field, if (field.nullable) OPTIONAL else REQUIRED, inShredded) if (useFieldId && ParquetUtils.hasFieldId(field)) { converted.withId(ParquetUtils.getFieldId(field)) } else { @@ -562,7 +563,10 @@ class SparkToParquetSchemaConverter( } } - private def convertField(field: StructField, repetition: Type.Repetition): Type = { + private def convertField( + field: StructField, + repetition: Type.Repetition, + inShredded: Boolean): Type = { field.dataType match { // =================== @@ -614,16 +618,26 @@ class SparkToParquetSchemaConverter( // from Spark 1.5.0, we resort to a timestamp type with microsecond precision so that we can // store a timestamp into a `Long`. This design decision is subject to change though, for // example, we may resort to nanosecond precision in the future. + // + // The Parquet Variant Shredding spec states that timestamps coming from shredded Variants + // must be stored as INT64, and the Spark Timestamp type always has microsecond precision, + // therefore, the TIMESTAMP_MICROS configuration is used when writing shredded variant + // timestamp subfields. case TimestampType => - outputTimestampType match { - case SQLConf.ParquetOutputTimestampType.INT96 => - Types.primitive(INT96, repetition).named(field.name) - case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS => - Types.primitive(INT64, repetition) - .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MICROS)).named(field.name) - case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS => - Types.primitive(INT64, repetition) - .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS)).named(field.name) + if (inShredded) { + Types.primitive(INT64, repetition) + .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MICROS)).named(field.name) + } else { + outputTimestampType match { + case SQLConf.ParquetOutputTimestampType.INT96 => + Types.primitive(INT96, repetition).named(field.name) + case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS => + Types.primitive(INT64, repetition) + .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MICROS)).named(field.name) + case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS => + Types.primitive(INT64, repetition) + .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS)).named(field.name) + } } case TimestampNTZType => @@ -699,7 +713,7 @@ class SparkToParquetSchemaConverter( .buildGroup(repetition).as(LogicalTypeAnnotation.listType()) .addField(Types .buildGroup(REPEATED) - .addField(convertField(StructField("array", elementType, nullable))) + .addField(convertField(StructField("array", elementType, nullable), inShredded)) .named("bag")) .named(field.name) @@ -715,7 +729,7 @@ class SparkToParquetSchemaConverter( Types .buildGroup(repetition).as(LogicalTypeAnnotation.listType()) // "array" is the name chosen by parquet-avro (1.7.0 and prior version) - .addField(convertField(StructField("array", elementType, nullable), REPEATED)) + .addField(convertField(StructField("array", elementType, nullable), REPEATED, inShredded)) .named(field.name) // Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by @@ -730,8 +744,8 @@ class SparkToParquetSchemaConverter( ConversionPatterns.mapType( repetition, field.name, - convertField(StructField("key", keyType, nullable = false)), - convertField(StructField("value", valueType, valueContainsNull))) + convertField(StructField("key", keyType, nullable = false), inShredded), + convertField(StructField("value", valueType, valueContainsNull), inShredded)) // ===================================== // ArrayType and MapType (standard mode) @@ -747,7 +761,7 @@ class SparkToParquetSchemaConverter( .buildGroup(repetition).as(LogicalTypeAnnotation.listType()) .addField( Types.repeatedGroup() - .addField(convertField(StructField("element", elementType, containsNull))) + .addField(convertField(StructField("element", elementType, containsNull), inShredded)) .named("list")) .named(field.name) @@ -763,9 +777,10 @@ class SparkToParquetSchemaConverter( .addField( Types .repeatedGroup() - .addField(convertField(StructField("key", keyType, nullable = false))) - .addField(convertField(StructField("value", valueType, valueContainsNull))) - .named("key_value")) + .addField(convertField(StructField("key", keyType, nullable = false), inShredded)) + .addField( + convertField(StructField("value", valueType, valueContainsNull), inShredded) + ).named("key_value")) .named(field.name) // =========== @@ -774,25 +789,25 @@ class SparkToParquetSchemaConverter( case VariantType => Types.buildGroup(repetition) - .addField(convertField(StructField("value", BinaryType, nullable = false))) - .addField(convertField(StructField("metadata", BinaryType, nullable = false))) + .addField(convertField(StructField("value", BinaryType, nullable = false), inShredded)) + .addField(convertField(StructField("metadata", BinaryType, nullable = false), inShredded)) .named(field.name) case s: StructType if SparkShreddingUtils.isVariantShreddingStruct(s) => // Variant struct takes a Variant and writes to Parquet as a shredded schema. val group = Types.buildGroup(repetition) s.fields.foreach { f => - group.addField(convertField(f)) + group.addField(convertField(f, inShredded = true)) } group.named(field.name) case StructType(fields) => fields.foldLeft(Types.buildGroup(repetition)) { (builder, field) => - builder.addField(convertField(field)) + builder.addField(convertField(field, inShredded)) }.named(field.name) case udt: UserDefinedType[_] => - convertField(field.copy(dataType = udt.sqlType)) + convertField(field.copy(dataType = udt.sqlType), inShredded) case _ => throw QueryCompilationErrors.cannotConvertDataTypeToParquetTypeError(field) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 35eb57a2e4fb..104d566e47b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -122,7 +122,8 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { SQLConf.ParquetOutputTimestampType.withName(configuration.get(key)) } - this.rootFieldWriters = shreddedSchema.map(_.dataType).map(makeWriter).toArray[ValueWriter] + this.rootFieldWriters = + shreddedSchema.map(_.dataType).map(makeWriter(_, inShredded = false)).toArray[ValueWriter] val messageType = new SparkToParquetSchemaConverter(configuration).convert(shreddedSchema) val metadata = Map( @@ -189,7 +190,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { } } - private def makeWriter(dataType: DataType): ValueWriter = { + // `inShredded` indicates whether the current traversal is nested within a shredded Variant + // schema. This affects how timestamp values are written. + private def makeWriter(dataType: DataType, inShredded: Boolean): ValueWriter = { dataType match { case BooleanType => (row: SpecializedGetters, ordinal: Int) => @@ -229,25 +232,34 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes)) case TimestampType => - outputTimestampType match { - case SQLConf.ParquetOutputTimestampType.INT96 => - (row: SpecializedGetters, ordinal: Int) => - val micros = int96RebaseFunc(row.getLong(ordinal)) - val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(micros) - val buf = ByteBuffer.wrap(timestampBuffer) - buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) - recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) - - case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS => - (row: SpecializedGetters, ordinal: Int) => - val micros = row.getLong(ordinal) - recordConsumer.addLong(timestampRebaseFunc(micros)) - - case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS => - (row: SpecializedGetters, ordinal: Int) => - val micros = row.getLong(ordinal) - val millis = DateTimeUtils.microsToMillis(timestampRebaseFunc(micros)) - recordConsumer.addLong(millis) + if (inShredded) { + // The Parquet Variant Shredding schema states that timestamp types must be stored as + // INT64. The Spark Timestamp type always has microsecond granularity and therefore, we + // always write shredded timestamps in the TIMESTAMP_MICROS configuration. + (row: SpecializedGetters, ordinal: Int) => + val micros = row.getLong(ordinal) + recordConsumer.addLong(timestampRebaseFunc(micros)) + } else { + outputTimestampType match { + case SQLConf.ParquetOutputTimestampType.INT96 => + (row: SpecializedGetters, ordinal: Int) => + val micros = int96RebaseFunc(row.getLong(ordinal)) + val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(micros) + val buf = ByteBuffer.wrap(timestampBuffer) + buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) + recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) + + case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS => + (row: SpecializedGetters, ordinal: Int) => + val micros = row.getLong(ordinal) + recordConsumer.addLong(timestampRebaseFunc(micros)) + + case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS => + (row: SpecializedGetters, ordinal: Int) => + val micros = row.getLong(ordinal) + val millis = DateTimeUtils.microsToMillis(timestampRebaseFunc(micros)) + recordConsumer.addLong(millis) + } } case TimestampNTZType => @@ -275,7 +287,8 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { } case s: StructType if SparkShreddingUtils.isVariantShreddingStruct(s) => - val fieldWriters = s.map(_.dataType).map(makeWriter).toArray[ValueWriter] + val fieldWriters = + s.map(_.dataType).map(makeWriter(_, inShredded = true)).toArray[ValueWriter] val variantShreddingSchema = SparkShreddingUtils.buildVariantSchema(s) (row: SpecializedGetters, ordinal: Int) => val v = row.getVariant(ordinal) @@ -286,7 +299,8 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { } case t: StructType => - val fieldWriters = t.map(_.dataType).map(makeWriter).toArray[ValueWriter] + val fieldWriters = + t.map(_.dataType).map(makeWriter(_, inShredded)).toArray[ValueWriter] (row: SpecializedGetters, ordinal: Int) => consumeGroup { writeFields(row.getStruct(ordinal, t.length), t, fieldWriters) @@ -296,7 +310,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { case t: MapType => makeMapWriter(t) - case t: UserDefinedType[_] => makeWriter(t.sqlType) + case t: UserDefinedType[_] => makeWriter(t.sqlType, inShredded) case _ => throw SparkException.internalError(s"Unsupported data type $dataType.") } @@ -376,7 +390,8 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { } def makeArrayWriter(arrayType: ArrayType): ValueWriter = { - val elementWriter = makeWriter(arrayType.elementType) + // The shredded schema should not have an array inside + val elementWriter = makeWriter(arrayType.elementType, inShredded = false) def threeLevelArrayWriter(repeatedGroupName: String, elementFieldName: String): ValueWriter = (row: SpecializedGetters, ordinal: Int) => { @@ -456,8 +471,8 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { } private def makeMapWriter(mapType: MapType): ValueWriter = { - val keyWriter = makeWriter(mapType.keyType) - val valueWriter = makeWriter(mapType.valueType) + val keyWriter = makeWriter(mapType.keyType, inShredded = false) + val valueWriter = makeWriter(mapType.valueType, inShredded = false) val repeatedGroupName = if (writeLegacyParquetFormat) { // Legacy mode: // diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala index be83d2c65323..dd34cab2e77e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala @@ -940,7 +940,7 @@ class QueryCompilationErrorsSuite } checkError( exception = intercept[AnalysisException] { - converter.convertField(StructField("test", dummyDataType)) + converter.convertField(StructField("test", dummyDataType), inShredded = false) }, condition = "INTERNAL_ERROR", parameters = Map("message" -> "Cannot convert Spark data type \"DUMMY\" to any Parquet type.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala index 4da5c264655d..458b5dfc0f4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala @@ -19,8 +19,16 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.util.HadoopInputFile +import org.apache.parquet.schema.{LogicalTypeAnnotation, PrimitiveType} +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName + import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.unsafe.types.VariantVal @@ -35,6 +43,104 @@ class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with Share } } + testWithTempDir("timestamp physical type") { dir => + ParquetOutputTimestampType.values.foreach { timestampParquetType => + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> timestampParquetType.toString) { + val schema = "t timestamp, st struct<t timestamp>" + val fullSchema = "v struct<metadata binary, value binary, typed_value struct<" + + "t struct<value binary, typed_value timestamp>," + + "st struct<" + + "value binary, typed_value struct<t struct<value binary, typed_value timestamp>>>>>, " + + "t1 timestamp, st1 struct<t1 timestamp>" + val df = spark.sql( + """ + | select + | to_variant_object( + | named_struct('t', 1::timestamp, 'st', named_struct('t', 2::timestamp)) + | ) v, 3::timestamp t1, named_struct('t1', 4::timestamp) st1 + | from range(1) + |""".stripMargin) + withSQLConf(SQLConf.VARIANT_WRITE_SHREDDING_ENABLED.key -> true.toString, + SQLConf.VARIANT_ALLOW_READING_SHREDDED.key -> true.toString, + SQLConf.VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key -> schema) { + df.write.mode("overwrite").parquet(dir.getAbsolutePath) + checkAnswer( + spark.read.parquet(dir.getAbsolutePath).selectExpr("to_json(v)"), + df.selectExpr("to_json(v)").collect() + ) + val shreddedDf = spark.read.schema(fullSchema).parquet(dir.getAbsolutePath) + checkAnswer( + shreddedDf.selectExpr("v.typed_value.t.typed_value::long"), + Seq(Row(1))) + checkAnswer( + shreddedDf.selectExpr("v.typed_value.st.typed_value.t.typed_value::long"), + Seq(Row(2))) + checkAnswer( + shreddedDf.selectExpr("t1::long"), + Seq(Row(3))) + checkAnswer( + shreddedDf.selectExpr("st1.t1::long"), + Seq(Row(4))) + val file = dir.listFiles().find(_.getName.endsWith(".parquet")).get + val parquetFilePath = file.getAbsolutePath + val inputFile = HadoopInputFile.fromPath(new Path(parquetFilePath), new Configuration()) + try { + val reader = ParquetFileReader.open(inputFile) + val footer = reader.getFooter + val schema = footer.getFileMetaData.getSchema + // v.typed_value.t.typed_value + val vGroup = schema.getType(schema.getFieldIndex("v")).asGroupType() + val typedValueGroup = vGroup.getType("typed_value").asGroupType() + val tGroup = typedValueGroup.getType("t").asGroupType() + val typedValue1 = tGroup.getType("typed_value").asPrimitiveType() + assert(typedValue1.getPrimitiveTypeName == PrimitiveTypeName.INT64) + assert(typedValue1.getLogicalTypeAnnotation == LogicalTypeAnnotation.timestampType( + true, LogicalTypeAnnotation.TimeUnit.MICROS)) + + // v.typed_value.st.typed_value.t.typed_value + val stGroup = typedValueGroup.getType("st").asGroupType() + val stTypedValueGroup = stGroup.getType("typed_value").asGroupType() + val stTGroup = stTypedValueGroup.getType("t").asGroupType() + val typedValue2 = stTGroup.getType("typed_value").asPrimitiveType() + assert(typedValue2.getPrimitiveTypeName == PrimitiveTypeName.INT64) + assert(typedValue2.getLogicalTypeAnnotation == LogicalTypeAnnotation.timestampType( + true, LogicalTypeAnnotation.TimeUnit.MICROS)) + + def verifyNonVariantTimestampType(t: PrimitiveType): Unit = { + timestampParquetType match { + case ParquetOutputTimestampType.INT96 => + assert(t.getPrimitiveTypeName == PrimitiveTypeName.INT96) + assert(t.getLogicalTypeAnnotation == null) + case ParquetOutputTimestampType.TIMESTAMP_MICROS => + assert(t.getPrimitiveTypeName == PrimitiveTypeName.INT64) + assert(t.getLogicalTypeAnnotation == LogicalTypeAnnotation.timestampType( + true, LogicalTypeAnnotation.TimeUnit.MICROS)) + case ParquetOutputTimestampType.TIMESTAMP_MILLIS => + assert(t.getPrimitiveTypeName == PrimitiveTypeName.INT64) + assert(t.getLogicalTypeAnnotation == LogicalTypeAnnotation.timestampType( + true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + } + } + + // t1 + val t1Value = schema.getType(schema.getFieldIndex("t1")).asPrimitiveType() + verifyNonVariantTimestampType(t1Value) + + // st1.t1 + val st1Group = schema.getType(schema.getFieldIndex("st1")).asGroupType() + val st1T1Value = st1Group.getType("t1").asPrimitiveType() + verifyNonVariantTimestampType(st1T1Value) + reader.close() + } catch { + case e: Exception => + e.printStackTrace() + } + } + } + } + } + + testWithTempDir("write shredded variant basic") { dir => val schema = "a int, b string, c decimal(15, 1)" val df = spark.sql( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org