This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4882bb255c0922f016f7faee29c4e9bc330a114f Author: Tim Brown <[email protected]> AuthorDate: Wed Oct 15 14:02:36 2025 -0400 fix: Spark Schema Evolution Fix for nested columns (#14075) --- .../hudi/io/storage/HoodieSparkParquetReader.java | 8 +- .../parquet/HoodieParquetReadSupport.scala | 98 ++++++++++++++++++++ .../org/apache/spark/sql/hudi/SparkAdapter.scala | 5 +- .../parquet/TestHoodieParquetReadSupport.scala | 103 +++++++++++++++++++++ .../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala | 2 - .../apache/spark/sql/adapter/Spark3_3Adapter.scala | 6 +- .../Spark33LegacyHoodieParquetFileFormat.scala | 2 +- .../datasources/parquet/Spark33ParquetReader.scala | 2 +- .../apache/spark/sql/adapter/Spark3_4Adapter.scala | 6 +- .../Spark34LegacyHoodieParquetFileFormat.scala | 2 +- .../datasources/parquet/Spark34ParquetReader.scala | 2 +- .../apache/spark/sql/adapter/Spark3_5Adapter.scala | 6 +- .../Spark35LegacyHoodieParquetFileFormat.scala | 2 +- .../datasources/parquet/Spark35ParquetReader.scala | 2 +- .../apache/spark/sql/adapter/Spark4_0Adapter.scala | 6 +- .../Spark40LegacyHoodieParquetFileFormat.scala | 2 +- .../datasources/parquet/Spark40ParquetReader.scala | 2 +- 17 files changed, 240 insertions(+), 16 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java index fda9a3871f05..5e5e6b9b4216 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java @@ -44,6 +44,7 @@ import org.apache.spark.sql.HoodieInternalRowUtils; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetReadSupport; import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport; import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter; import org.apache.spark.sql.execution.datasources.parquet.SparkBasicSchemaEvolution; @@ -55,6 +56,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; +import scala.Option$; + import static org.apache.hudi.common.util.TypeUtils.unsafeCast; import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS; @@ -123,7 +126,10 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader { storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), readSchemaJson); storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(), SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString()); storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString()); - ParquetReader<InternalRow> reader = ParquetReader.builder(new ParquetReadSupport(), new Path(path.toUri())) + ParquetReader<InternalRow> reader = ParquetReader.builder(new HoodieParquetReadSupport(Option$.MODULE$.empty(), true, + SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("CORRECTED"), + SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY")), + new Path(path.toUri())) .withConf(storage.getConf().unwrapAs(Configuration.class)) .build(); UnsafeProjection projection = evolution.generateUnsafeProjection(); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala new file mode 100644 index 000000000000..d55b8c8d25d5 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hudi.SparkAdapterSupport +import org.apache.hudi.common.util.ValidationUtils + +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport.ReadContext +import org.apache.parquet.schema.{GroupType, MessageType, Type, Types} +import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec + +import java.time.ZoneId + +import scala.collection.JavaConverters._ + +class HoodieParquetReadSupport( + convertTz: Option[ZoneId], + enableVectorizedReader: Boolean, + datetimeRebaseSpec: RebaseSpec, + int96RebaseSpec: RebaseSpec) + extends ParquetReadSupport(convertTz, enableVectorizedReader, datetimeRebaseSpec, int96RebaseSpec) with SparkAdapterSupport { + + override def init(context: InitContext): ReadContext = { + val readContext = super.init(context) + val requestedParquetSchema = readContext.getRequestedSchema + val trimmedParquetSchema = HoodieParquetReadSupport.trimParquetSchema(requestedParquetSchema, context.getFileSchema) + new ReadContext(trimmedParquetSchema, readContext.getReadSupportMetadata) + } + +} + +object HoodieParquetReadSupport { + /** + * Removes any fields from the parquet schema that do not have any child fields in the actual file schema after the + * schema is trimmed down to the requested fields. This can happen when the table schema evolves and only a subset of + * the nested fields are required by the query. + * + * @param requestedSchema the initial parquet schema requested by Spark + * @param fileSchema the actual parquet schema of the file + * @return a potentially updated schema with empty struct fields removed + */ + def trimParquetSchema(requestedSchema: MessageType, fileSchema: MessageType): MessageType = { + val trimmedFields = requestedSchema.getFields.asScala.map(field => { + if (fileSchema.containsField(field.getName)) { + trimParquetType(field, fileSchema.asGroupType().getType(field.getName)) + } else { + Some(field) + } + }).filter(_.isDefined).map(_.get).toArray[Type] + Types.buildMessage().addFields(trimmedFields: _*).named(requestedSchema.getName) + } + + private def trimParquetType(requestedType: Type, fileType: Type): Option[Type] = { + if (requestedType.equals(fileType)) { + Some(requestedType) + } else { + requestedType match { + case groupType: GroupType => + ValidationUtils.checkState(!fileType.isPrimitive, + "Group type provided by requested schema but existing type in the file is a primitive") + val fileTypeGroup = fileType.asGroupType() + var hasMatchingField = false + val fields = groupType.getFields.asScala.map(field => { + if (fileTypeGroup.containsField(field.getName)) { + hasMatchingField = true + trimParquetType(field, fileType.asGroupType().getType(field.getName)) + } else { + Some(field) + } + }).filter(_.isDefined).map(_.get).asJava + if (hasMatchingField && !fields.isEmpty) { + Some(groupType.withNewFields(fields)) + } else { + None + } + case _ => Some(requestedType) + } + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index aa03be2872e2..b803084831fc 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface} import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.catalyst.util.DateFormatter +import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -45,7 +46,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.parser.HoodieExtendedParserInterface import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.{DataType, Metadata, StructType} -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String @@ -242,6 +243,8 @@ trait SparkAdapter extends Serializable { def isTimestampNTZType(dataType: DataType): Boolean + def getRebaseSpec(policy: String): RebaseSpec + /** * Gets the [[UTF8String]] factory implementation for the current Spark version. * [SPARK-46832] [[UTF8String]] doesn't support compareTo anymore since Spark 4.0 diff --git a/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala new file mode 100644 index 000000000000..456cca7507f9 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.Types +import org.junit.jupiter.api.{Assertions, Test} + +class TestHoodieParquetReadSupport { + + /** + * Validate that when none of the child fields of a nested struct or array field match between the + * requested schema and the actual file schema, the entire struct/array field is removed from + * the requested schema. For map fields, the key type is matched and retained even if + * the value type does not have any matching fields. + */ + @Test + def testSchemaTrimming_noRemainingFields(): Unit = { + val requiredNestedField = Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("nested_a")) + val dataNestedField = Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("nested_b")) + val requiredArrayField = Types.requiredList().optionalGroupElement().addField(requiredNestedField.named("element")).named("list") + val dataArrayField = Types.requiredList().optionalGroupElement().addField(dataNestedField.named("element")).named("list") + val requiredMapField = Types.requiredMap().key(PrimitiveTypeName.BINARY).value(requiredNestedField.named("value")).named("key_value") + val dataMapField = Types.requiredMap().key(PrimitiveTypeName.BINARY).value(dataNestedField.named("value")).named("key_value") + val requiredSchema = Types.buildMessage() + .addField(Types.required(PrimitiveTypeName.BINARY).named("a")) + .addField(requiredNestedField.named("b")) + .addField(requiredArrayField) + .addField(requiredMapField) + .addField(Types.required(PrimitiveTypeName.BINARY).named("e")) + .named("required") + val dataSchema = Types.buildMessage() + .addField(Types.required(PrimitiveTypeName.BINARY).named("a")) + .addField(dataNestedField.named("b")) + .addField(dataArrayField) + .addField(dataMapField) + .addField(Types.required(PrimitiveTypeName.BINARY).named("e")) + .named("data") + + val trimmedSchema = HoodieParquetReadSupport.trimParquetSchema(requiredSchema, dataSchema) + + // The nested struct field "b" and the array field "list" are removed because they do not have any + // matching child fields in the data schema. The map field "key_value" is retained because the key type + // matches even though the value struct does not have any matching fields. + val expectedSchema = Types.buildMessage() + .addField(Types.required(PrimitiveTypeName.BINARY).named("a")) + // only the key value is retained because the value struct does not have any matching fields + .addField(Types.requiredMap().key(PrimitiveTypeName.BINARY).named("key_value")) + .addField(Types.required(PrimitiveTypeName.BINARY).named("e")) + .named("required") + + Assertions.assertEquals(expectedSchema, trimmedSchema) + } + + /** + * Validate that when at least one child field of a nested struct/array/map field matches between the + * requested schema and the actual file schema, the entire struct/array/map field is retained. + */ + @Test + def testSchemaTrimming_atLeastOneFieldMatches(): Unit = { + val requiredNestedField = Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("nested_a")) + .addField(Types.required(PrimitiveTypeName.INT32).named("nested_b")) + val dataNestedField = Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("nested_b")) + .addField(Types.required(PrimitiveTypeName.INT32).named("nested_c")) + val requiredArrayField = Types.requiredList().optionalGroupElement().addField(requiredNestedField.named("element")).named("list") + val dataArrayField = Types.requiredList().optionalGroupElement().addField(dataNestedField.named("element")).named("list") + val requiredMapField = Types.requiredMap().key(PrimitiveTypeName.BINARY).value(requiredNestedField.named("value")).named("key_value") + val dataMapField = Types.requiredMap().key(PrimitiveTypeName.BINARY).value(dataNestedField.named("value")).named("key_value") + val requiredSchema = Types.buildMessage() + .addField(Types.required(PrimitiveTypeName.BINARY).named("a")) + .addField(requiredNestedField.named("b")) + .addField(requiredArrayField) + .addField(requiredMapField) + .addField(Types.required(PrimitiveTypeName.BINARY).named("e")) + .named("required") + val dataSchema = Types.buildMessage() + .addField(Types.required(PrimitiveTypeName.BINARY).named("a")) + .addField(dataNestedField.named("b")) + .addField(dataArrayField) + .addField(dataMapField) + .addField(Types.required(PrimitiveTypeName.BINARY).named("e")) + .named("data") + + val trimmedSchema = HoodieParquetReadSupport.trimParquetSchema(requiredSchema, dataSchema) + + Assertions.assertEquals(requiredSchema, trimmedSchema) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala index 49e7740eecb2..e53f6826dd7d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala @@ -853,7 +853,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { .save(tablePath) val oldView = spark.read.format("hudi").options(readOpt) - .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),"false") .load(tablePath) oldView.show(5, false) @@ -870,7 +869,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { .mode(SaveMode.Append) .save(tablePath) spark.read.format("hudi").options(readOpt) - .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),"false") .load(tablePath).registerTempTable("newView") val checkResult = spark.sql(s"select tip_history.amount,city_to_state,distance_in_meters,fare,height from newView where _row_key='$checkRowKey' ") .collect().map(row => (row.isNullAt(0), row.isNullAt(1), row.isNullAt(2), row.isNullAt(3), row.isNullAt(4))) diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala index a135aba22808..25594e2e41f5 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY +import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY, RebaseDateTime} import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.Spark33OrcReader @@ -167,4 +167,8 @@ class Spark3_3Adapter extends BaseSpark3Adapter { override def isTimestampNTZType(dataType: DataType): Boolean = { dataType.getClass.getSimpleName.startsWith("TimestampNTZType") } + + override def getRebaseSpec(policy: String): RebaseDateTime.RebaseSpec = { + RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy)) + } } diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala index ffbe759dc2ae..71bf1e542d1a 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala @@ -320,7 +320,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) - val readSupport = new ParquetReadSupport( + val readSupport = new HoodieParquetReadSupport( convertTz, enableVectorizedReader = false, datetimeRebaseSpec, diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala index 0ec88047f26d..50fc06ea3187 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala @@ -189,7 +189,7 @@ class Spark33ParquetReader(enableVectorizedReader: Boolean, } } else { // ParquetRecordReader returns InternalRow - val readSupport = new ParquetReadSupport( + val readSupport = new HoodieParquetReadSupport( convertTz, enableVectorizedReader = false, datetimeRebaseSpec, diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala index 7774d01e239b..a652885fd75e 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY +import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY, RebaseDateTime} import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.Spark34OrcReader @@ -166,4 +166,8 @@ class Spark3_4Adapter extends BaseSpark3Adapter { override def isTimestampNTZType(dataType: DataType): Boolean = { dataType == DataTypes.TimestampNTZType } + + override def getRebaseSpec(policy: String): RebaseDateTime.RebaseSpec = { + RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy)) + } } diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala index 18bab34a247a..58f195f7253a 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala @@ -331,7 +331,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) - val readSupport = new ParquetReadSupport( + val readSupport = new HoodieParquetReadSupport( convertTz, enableVectorizedReader = false, datetimeRebaseSpec, diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala index 7cd17f1664f2..9692b343dd66 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala @@ -186,7 +186,7 @@ class Spark34ParquetReader(enableVectorizedReader: Boolean, } } else { // ParquetRecordReader returns InternalRow - val readSupport = new ParquetReadSupport( + val readSupport = new HoodieParquetReadSupport( convertTz, enableVectorizedReader = false, datetimeRebaseSpec, diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala index e2c937ebbe40..a140733c5c4a 100644 --- a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY +import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY, RebaseDateTime} import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.Spark35OrcReader @@ -182,4 +182,8 @@ class Spark3_5Adapter extends BaseSpark3Adapter { override def isTimestampNTZType(dataType: DataType): Boolean = { dataType == DataTypes.TimestampNTZType } + + override def getRebaseSpec(policy: String): RebaseDateTime.RebaseSpec = { + RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy)) + } } diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala index f90e675d768f..e8fc3b3f6479 100644 --- a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala @@ -332,7 +332,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) - val readSupport = new ParquetReadSupport( + val readSupport = new HoodieParquetReadSupport( convertTz, enableVectorizedReader = false, datetimeRebaseSpec, diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala index 3f2d97b43629..3d192dd3a170 100644 --- a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala +++ b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala @@ -193,7 +193,7 @@ class Spark35ParquetReader(enableVectorizedReader: Boolean, } } else { // ParquetRecordReader returns InternalRow - val readSupport = new ParquetReadSupport( + val readSupport = new HoodieParquetReadSupport( convertTz, enableVectorizedReader = false, datetimeRebaseSpec, diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala index b152ecfb2eab..ede4cdf23ddf 100644 --- a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY +import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY, RebaseDateTime} import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.Spark40OrcReader @@ -182,4 +182,8 @@ class Spark4_0Adapter extends BaseSpark4Adapter { override def isTimestampNTZType(dataType: DataType): Boolean = { dataType == DataTypes.TimestampNTZType } + + override def getRebaseSpec(policy: String): RebaseDateTime.RebaseSpec = { + RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy)) + } } diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala index ad0b33fee239..75190bc135c3 100644 --- a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala @@ -329,7 +329,7 @@ class Spark40LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu } } else { logDebug(s"Falling back to parquet-mr") - val readSupport = new ParquetReadSupport( + val readSupport = new HoodieParquetReadSupport( convertTz, enableVectorizedReader = false, datetimeRebaseSpec, diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala index 1431308a4092..5e8cea9f11a2 100644 --- a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala @@ -193,7 +193,7 @@ class Spark40ParquetReader(enableVectorizedReader: Boolean, } } else { // ParquetRecordReader returns InternalRow - val readSupport = new ParquetReadSupport( + val readSupport = new HoodieParquetReadSupport( convertTz, enableVectorizedReader = false, datetimeRebaseSpec,
