This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4882bb255c0922f016f7faee29c4e9bc330a114f
Author: Tim Brown <[email protected]>
AuthorDate: Wed Oct 15 14:02:36 2025 -0400

    fix: Spark Schema Evolution Fix for nested columns (#14075)
---
 .../hudi/io/storage/HoodieSparkParquetReader.java  |   8 +-
 .../parquet/HoodieParquetReadSupport.scala         |  98 ++++++++++++++++++++
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |   5 +-
 .../parquet/TestHoodieParquetReadSupport.scala     | 103 +++++++++++++++++++++
 .../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala  |   2 -
 .../apache/spark/sql/adapter/Spark3_3Adapter.scala |   6 +-
 .../Spark33LegacyHoodieParquetFileFormat.scala     |   2 +-
 .../datasources/parquet/Spark33ParquetReader.scala |   2 +-
 .../apache/spark/sql/adapter/Spark3_4Adapter.scala |   6 +-
 .../Spark34LegacyHoodieParquetFileFormat.scala     |   2 +-
 .../datasources/parquet/Spark34ParquetReader.scala |   2 +-
 .../apache/spark/sql/adapter/Spark3_5Adapter.scala |   6 +-
 .../Spark35LegacyHoodieParquetFileFormat.scala     |   2 +-
 .../datasources/parquet/Spark35ParquetReader.scala |   2 +-
 .../apache/spark/sql/adapter/Spark4_0Adapter.scala |   6 +-
 .../Spark40LegacyHoodieParquetFileFormat.scala     |   2 +-
 .../datasources/parquet/Spark40ParquetReader.scala |   2 +-
 17 files changed, 240 insertions(+), 16 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index fda9a3871f05..5e5e6b9b4216 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -44,6 +44,7 @@ import org.apache.spark.sql.HoodieInternalRowUtils;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import 
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetReadSupport;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport;
 import 
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
 import 
org.apache.spark.sql.execution.datasources.parquet.SparkBasicSchemaEvolution;
@@ -55,6 +56,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import scala.Option$;
+
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 import static 
org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
 
@@ -123,7 +126,10 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
     storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), 
readSchemaJson);
     storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(), 
SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
     storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), 
SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
-    ParquetReader<InternalRow> reader = ParquetReader.builder(new 
ParquetReadSupport(), new Path(path.toUri()))
+    ParquetReader<InternalRow> reader = ParquetReader.builder(new 
HoodieParquetReadSupport(Option$.MODULE$.empty(), true,
+            
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("CORRECTED"),
+            
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY")),
+            new Path(path.toUri()))
         .withConf(storage.getConf().unwrapAs(Configuration.class))
         .build();
     UnsafeProjection projection = evolution.generateUnsafeProjection();
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala
new file mode 100644
index 000000000000..d55b8c8d25d5
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.util.ValidationUtils
+
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
+import org.apache.parquet.schema.{GroupType, MessageType, Type, Types}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+
+import java.time.ZoneId
+
+import scala.collection.JavaConverters._
+
+class HoodieParquetReadSupport(
+                                convertTz: Option[ZoneId],
+                                enableVectorizedReader: Boolean,
+                                datetimeRebaseSpec: RebaseSpec,
+                                int96RebaseSpec: RebaseSpec)
+  extends ParquetReadSupport(convertTz, enableVectorizedReader, 
datetimeRebaseSpec, int96RebaseSpec) with SparkAdapterSupport {
+
+  override def init(context: InitContext): ReadContext = {
+    val readContext = super.init(context)
+    val requestedParquetSchema = readContext.getRequestedSchema
+    val trimmedParquetSchema = 
HoodieParquetReadSupport.trimParquetSchema(requestedParquetSchema, 
context.getFileSchema)
+    new ReadContext(trimmedParquetSchema, readContext.getReadSupportMetadata)
+  }
+
+}
+
+object HoodieParquetReadSupport {
+  /**
+   * Removes any fields from the parquet schema that do not have any child 
fields in the actual file schema after the
+   * schema is trimmed down to the requested fields. This can happen when the 
table schema evolves and only a subset of
+   * the nested fields are required by the query.
+   *
+   * @param requestedSchema the initial parquet schema requested by Spark
+   * @param fileSchema the actual parquet schema of the file
+   * @return a potentially updated schema with empty struct fields removed
+   */
+  def trimParquetSchema(requestedSchema: MessageType, fileSchema: 
MessageType): MessageType = {
+    val trimmedFields = requestedSchema.getFields.asScala.map(field => {
+      if (fileSchema.containsField(field.getName)) {
+        trimParquetType(field, fileSchema.asGroupType().getType(field.getName))
+      } else {
+        Some(field)
+      }
+    }).filter(_.isDefined).map(_.get).toArray[Type]
+    Types.buildMessage().addFields(trimmedFields: 
_*).named(requestedSchema.getName)
+  }
+
+  private def trimParquetType(requestedType: Type, fileType: Type): 
Option[Type] = {
+    if (requestedType.equals(fileType)) {
+      Some(requestedType)
+    } else {
+      requestedType match {
+        case groupType: GroupType =>
+          ValidationUtils.checkState(!fileType.isPrimitive,
+            "Group type provided by requested schema but existing type in the 
file is a primitive")
+          val fileTypeGroup = fileType.asGroupType()
+          var hasMatchingField = false
+          val fields = groupType.getFields.asScala.map(field => {
+            if (fileTypeGroup.containsField(field.getName)) {
+              hasMatchingField = true
+              trimParquetType(field, 
fileType.asGroupType().getType(field.getName))
+            } else {
+              Some(field)
+            }
+          }).filter(_.isDefined).map(_.get).asJava
+          if (hasMatchingField && !fields.isEmpty) {
+            Some(groupType.withNewFields(fields))
+          } else {
+            None
+          }
+        case _ => Some(requestedType)
+      }
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index aa03be2872e2..b803084831fc 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.parser.{ParseException, 
ParserInterface}
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
 import org.apache.spark.sql.catalyst.trees.Origin
 import org.apache.spark.sql.catalyst.util.DateFormatter
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -45,7 +46,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.parser.HoodieExtendedParserInterface
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.{DataType, Metadata, StructType}
-import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -242,6 +243,8 @@ trait SparkAdapter extends Serializable {
 
   def isTimestampNTZType(dataType: DataType): Boolean
 
+  def getRebaseSpec(policy: String): RebaseSpec
+
   /**
    * Gets the [[UTF8String]] factory implementation for the current Spark 
version.
    * [SPARK-46832] [[UTF8String]] doesn't support compareTo anymore since 
Spark 4.0
diff --git 
a/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala
 
b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala
new file mode 100644
index 000000000000..456cca7507f9
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+import org.apache.parquet.schema.Types
+import org.junit.jupiter.api.{Assertions, Test}
+
+class TestHoodieParquetReadSupport {
+
+  /**
+   * Validate that when none of the child fields of a nested struct or array 
field match between the
+   * requested schema and the actual file schema, the entire struct/array 
field is removed from
+   * the requested schema. For map fields, the key type is matched and 
retained even if
+   * the value type does not have any matching fields.
+   */
+  @Test
+  def testSchemaTrimming_noRemainingFields(): Unit = {
+    val requiredNestedField = 
Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("nested_a"))
+    val dataNestedField = 
Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("nested_b"))
+    val requiredArrayField = 
Types.requiredList().optionalGroupElement().addField(requiredNestedField.named("element")).named("list")
+    val dataArrayField = 
Types.requiredList().optionalGroupElement().addField(dataNestedField.named("element")).named("list")
+    val requiredMapField = 
Types.requiredMap().key(PrimitiveTypeName.BINARY).value(requiredNestedField.named("value")).named("key_value")
+    val dataMapField = 
Types.requiredMap().key(PrimitiveTypeName.BINARY).value(dataNestedField.named("value")).named("key_value")
+    val requiredSchema = Types.buildMessage()
+        .addField(Types.required(PrimitiveTypeName.BINARY).named("a"))
+        .addField(requiredNestedField.named("b"))
+        .addField(requiredArrayField)
+        .addField(requiredMapField)
+        .addField(Types.required(PrimitiveTypeName.BINARY).named("e"))
+        .named("required")
+    val dataSchema = Types.buildMessage()
+        .addField(Types.required(PrimitiveTypeName.BINARY).named("a"))
+        .addField(dataNestedField.named("b"))
+        .addField(dataArrayField)
+        .addField(dataMapField)
+        .addField(Types.required(PrimitiveTypeName.BINARY).named("e"))
+        .named("data")
+
+    val trimmedSchema = 
HoodieParquetReadSupport.trimParquetSchema(requiredSchema, dataSchema)
+
+    // The nested struct field "b" and the array field "list" are removed 
because they do not have any
+    // matching child fields in the data schema. The map field "key_value" is 
retained because the key type
+    // matches even though the value struct does not have any matching fields.
+    val expectedSchema = Types.buildMessage()
+        .addField(Types.required(PrimitiveTypeName.BINARY).named("a"))
+        // only the key value is retained because the value struct does not 
have any matching fields
+        
.addField(Types.requiredMap().key(PrimitiveTypeName.BINARY).named("key_value"))
+        .addField(Types.required(PrimitiveTypeName.BINARY).named("e"))
+        .named("required")
+
+    Assertions.assertEquals(expectedSchema, trimmedSchema)
+  }
+
+  /**
+   * Validate that when at least one child field of a nested struct/array/map 
field matches between the
+   * requested schema and the actual file schema, the entire struct/array/map 
field is retained.
+   */
+  @Test
+  def testSchemaTrimming_atLeastOneFieldMatches(): Unit = {
+    val requiredNestedField = 
Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("nested_a"))
+      .addField(Types.required(PrimitiveTypeName.INT32).named("nested_b"))
+    val dataNestedField = 
Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("nested_b"))
+      .addField(Types.required(PrimitiveTypeName.INT32).named("nested_c"))
+    val requiredArrayField = 
Types.requiredList().optionalGroupElement().addField(requiredNestedField.named("element")).named("list")
+    val dataArrayField = 
Types.requiredList().optionalGroupElement().addField(dataNestedField.named("element")).named("list")
+    val requiredMapField = 
Types.requiredMap().key(PrimitiveTypeName.BINARY).value(requiredNestedField.named("value")).named("key_value")
+    val dataMapField = 
Types.requiredMap().key(PrimitiveTypeName.BINARY).value(dataNestedField.named("value")).named("key_value")
+    val requiredSchema = Types.buildMessage()
+        .addField(Types.required(PrimitiveTypeName.BINARY).named("a"))
+        .addField(requiredNestedField.named("b"))
+        .addField(requiredArrayField)
+        .addField(requiredMapField)
+        .addField(Types.required(PrimitiveTypeName.BINARY).named("e"))
+        .named("required")
+    val dataSchema = Types.buildMessage()
+        .addField(Types.required(PrimitiveTypeName.BINARY).named("a"))
+        .addField(dataNestedField.named("b"))
+        .addField(dataArrayField)
+        .addField(dataMapField)
+        .addField(Types.required(PrimitiveTypeName.BINARY).named("e"))
+        .named("data")
+
+    val trimmedSchema = 
HoodieParquetReadSupport.trimParquetSchema(requiredSchema, dataSchema)
+
+    Assertions.assertEquals(requiredSchema, trimmedSchema)
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index 49e7740eecb2..e53f6826dd7d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -853,7 +853,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           .save(tablePath)
 
         val oldView = spark.read.format("hudi").options(readOpt)
-            .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),"false")
             .load(tablePath)
         oldView.show(5, false)
 
@@ -870,7 +869,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           .mode(SaveMode.Append)
           .save(tablePath)
         spark.read.format("hudi").options(readOpt)
-            .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),"false")
             .load(tablePath).registerTempTable("newView")
         val checkResult = spark.sql(s"select 
tip_history.amount,city_to_state,distance_in_meters,fare,height from newView 
where _row_key='$checkRowKey' ")
           .collect().map(row => (row.isNullAt(0), row.isNullAt(1), 
row.isNullAt(2), row.isNullAt(3), row.isNullAt(4)))
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index a135aba22808..25594e2e41f5 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -31,7 +31,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
+import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY, 
RebaseDateTime}
 import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.orc.Spark33OrcReader
@@ -167,4 +167,8 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
   override def isTimestampNTZType(dataType: DataType): Boolean = {
     dataType.getClass.getSimpleName.startsWith("TimestampNTZType")
   }
+
+  override def getRebaseSpec(policy: String): RebaseDateTime.RebaseSpec = {
+    RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy))
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
index ffbe759dc2ae..71bf1e542d1a 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
@@ -320,7 +320,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
           
DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, 
int96RebaseModeInRead)
         val datetimeRebaseSpec =
           
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, 
datetimeRebaseModeInRead)
-        val readSupport = new ParquetReadSupport(
+        val readSupport = new HoodieParquetReadSupport(
           convertTz,
           enableVectorizedReader = false,
           datetimeRebaseSpec,
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
index 0ec88047f26d..50fc06ea3187 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
@@ -189,7 +189,7 @@ class Spark33ParquetReader(enableVectorizedReader: Boolean,
       }
     } else {
       // ParquetRecordReader returns InternalRow
-      val readSupport = new ParquetReadSupport(
+      val readSupport = new HoodieParquetReadSupport(
         convertTz,
         enableVectorizedReader = false,
         datetimeRebaseSpec,
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index 7774d01e239b..a652885fd75e 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -31,7 +31,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
+import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY, 
RebaseDateTime}
 import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.orc.Spark34OrcReader
@@ -166,4 +166,8 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
   override def isTimestampNTZType(dataType: DataType): Boolean = {
     dataType == DataTypes.TimestampNTZType
   }
+
+  override def getRebaseSpec(policy: String): RebaseDateTime.RebaseSpec = {
+    RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy))
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
index 18bab34a247a..58f195f7253a 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
@@ -331,7 +331,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
           
DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, 
int96RebaseModeInRead)
         val datetimeRebaseSpec =
           
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, 
datetimeRebaseModeInRead)
-        val readSupport = new ParquetReadSupport(
+        val readSupport = new HoodieParquetReadSupport(
           convertTz,
           enableVectorizedReader = false,
           datetimeRebaseSpec,
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
index 7cd17f1664f2..9692b343dd66 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
@@ -186,7 +186,7 @@ class Spark34ParquetReader(enableVectorizedReader: Boolean,
       }
     } else {
       // ParquetRecordReader returns InternalRow
-      val readSupport = new ParquetReadSupport(
+      val readSupport = new HoodieParquetReadSupport(
         convertTz,
         enableVectorizedReader = false,
         datetimeRebaseSpec,
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
index e2c937ebbe40..a140733c5c4a 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
@@ -31,7 +31,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
+import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY, 
RebaseDateTime}
 import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.orc.Spark35OrcReader
@@ -182,4 +182,8 @@ class Spark3_5Adapter extends BaseSpark3Adapter {
   override def isTimestampNTZType(dataType: DataType): Boolean = {
     dataType == DataTypes.TimestampNTZType
   }
+
+  override def getRebaseSpec(policy: String): RebaseDateTime.RebaseSpec = {
+    RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy))
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
index f90e675d768f..e8fc3b3f6479 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala
@@ -332,7 +332,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
         
DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, 
int96RebaseModeInRead)
         val datetimeRebaseSpec =
           
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, 
datetimeRebaseModeInRead)
-        val readSupport = new ParquetReadSupport(
+        val readSupport = new HoodieParquetReadSupport(
           convertTz,
           enableVectorizedReader = false,
           datetimeRebaseSpec,
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
index 3f2d97b43629..3d192dd3a170 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
@@ -193,7 +193,7 @@ class Spark35ParquetReader(enableVectorizedReader: Boolean,
       }
     } else {
       // ParquetRecordReader returns InternalRow
-      val readSupport = new ParquetReadSupport(
+      val readSupport = new HoodieParquetReadSupport(
         convertTz,
         enableVectorizedReader = false,
         datetimeRebaseSpec,
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
index b152ecfb2eab..ede4cdf23ddf 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
@@ -31,7 +31,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
+import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY, 
RebaseDateTime}
 import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.orc.Spark40OrcReader
@@ -182,4 +182,8 @@ class Spark4_0Adapter extends BaseSpark4Adapter {
   override def isTimestampNTZType(dataType: DataType): Boolean = {
     dataType == DataTypes.TimestampNTZType
   }
+
+  override def getRebaseSpec(policy: String): RebaseDateTime.RebaseSpec = {
+    RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy))
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala
index ad0b33fee239..75190bc135c3 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala
@@ -329,7 +329,7 @@ class Spark40LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
         }
       } else {
         logDebug(s"Falling back to parquet-mr")
-        val readSupport = new ParquetReadSupport(
+        val readSupport = new HoodieParquetReadSupport(
           convertTz,
           enableVectorizedReader = false,
           datetimeRebaseSpec,
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
index 1431308a4092..5e8cea9f11a2 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
@@ -193,7 +193,7 @@ class Spark40ParquetReader(enableVectorizedReader: Boolean,
       }
     } else {
       // ParquetRecordReader returns InternalRow
-      val readSupport = new ParquetReadSupport(
+      val readSupport = new HoodieParquetReadSupport(
         convertTz,
         enableVectorizedReader = false,
         datetimeRebaseSpec,

Reply via email to