This is an automated email from the ASF dual-hosted git repository.

mengtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 64b814ea23 [HUDI-5400] Fix read issues when Hudi-FULL schema evolution 
is not enabled (#7480)
64b814ea23 is described below

commit 64b814ea237bd1576af3673d04c7bb965218fdef
Author: voonhous <[email protected]>
AuthorDate: Sat Dec 24 15:41:59 2022 +0800

    [HUDI-5400] Fix read issues when Hudi-FULL schema evolution is not enabled 
(#7480)
---
 .../parquet/HoodieParquetFileFormatHelper.scala    |  72 ++
 .../hudi/TestAvroSchemaResolutionSupport.scala     | 794 +++++++++++++++++++++
 ...Spark24HoodieVectorizedParquetRecordReader.java | 185 +++++
 .../parquet/Spark24HoodieParquetFileFormat.scala   |  62 +-
 .../parquet/Spark31HoodieParquetFileFormat.scala   |  12 +-
 .../Spark32PlusHoodieParquetFileFormat.scala       |  10 +-
 6 files changed, 1116 insertions(+), 19 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
new file mode 100644
index 0000000000..ce1a719cb9
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.metadata.FileMetaData
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, 
StructType}
+
+object HoodieParquetFileFormatHelper {
+
+  def buildImplicitSchemaChangeInfo(hadoopConf: Configuration,
+                                    parquetFileMetaData: FileMetaData,
+                                    requiredSchema: StructType): 
(java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, 
DataType]], StructType) = {
+    val implicitTypeChangeInfo: java.util.Map[Integer, 
org.apache.hudi.common.util.collection.Pair[DataType, DataType]] = new 
java.util.HashMap()
+    val convert = new ParquetToSparkSchemaConverter(hadoopConf)
+    val fileStruct = convert.convert(parquetFileMetaData.getSchema)
+    val fileStructMap = fileStruct.fields.map(f => (f.name, f.dataType)).toMap
+    val sparkRequestStructFields = requiredSchema.map(f => {
+      val requiredType = f.dataType
+      if (fileStructMap.contains(f.name) && !isDataTypeEqual(requiredType, 
fileStructMap(f.name))) {
+        implicitTypeChangeInfo.put(new 
Integer(requiredSchema.fieldIndex(f.name)), 
org.apache.hudi.common.util.collection.Pair.of(requiredType, 
fileStructMap(f.name)))
+        StructField(f.name, fileStructMap(f.name), f.nullable)
+      } else {
+        f
+      }
+    })
+    (implicitTypeChangeInfo, StructType(sparkRequestStructFields))
+  }
+
+  def isDataTypeEqual(requiredType: DataType, fileType: DataType): Boolean = 
(requiredType, fileType) match {
+    case (requiredType, fileType) if requiredType == fileType => true
+
+    case (ArrayType(rt, _), ArrayType(ft, _)) =>
+      // Do not care about nullability as schema evolution require fields to 
be nullable
+      isDataTypeEqual(rt, ft)
+
+    case (MapType(requiredKey, requiredValue, _), MapType(fileKey, fileValue, 
_)) =>
+      // Likewise, do not care about nullability as schema evolution require 
fields to be nullable
+      isDataTypeEqual(requiredKey, fileKey) && isDataTypeEqual(requiredValue, 
fileValue)
+
+    case (StructType(requiredFields), StructType(fileFields)) =>
+      // Find fields that are in requiredFields and fileFields as they might 
not be the same during add column + change column operations
+      val commonFieldNames = requiredFields.map(_.name) intersect 
fileFields.map(_.name)
+
+      // Need to match by name instead of StructField as name will stay the 
same whilst type may change
+      val fileFilteredFields = fileFields.filter(f => 
commonFieldNames.contains(f.name)).sortWith(_.name < _.name)
+      val requiredFilteredFields = requiredFields.filter(f => 
commonFieldNames.contains(f.name)).sortWith(_.name < _.name)
+
+      // Sorting ensures that the same field names are being compared for type 
differences
+      requiredFilteredFields.zip(fileFilteredFields).forall {
+        case (requiredField, fileFilteredField) =>
+          isDataTypeEqual(requiredField.dataType, fileFilteredField.dataType)
+      }
+
+    case _ => false
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
new file mode 100644
index 0000000000..ad476fb38f
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -0,0 +1,794 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hudi
+
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import scala.language.postfixOps
+
+/**
+ * Test cases to validate Hudi's support for writing and reading when evolving 
schema implicitly via Avro's Schema Resolution
+ * Note: Test will explicitly write into different partitions to ensure that a 
Hudi table will have multiple filegroups with different schemas.
+ */
+class TestAvroSchemaResolutionSupport extends HoodieClientTestBase {
+
+  var spark: SparkSession = _
+  val commonOpts: Map[String, String] = Map(
+    HoodieWriteConfig.TBL_NAME.key -> "hoodie_avro_schema_resolution_support",
+    "hoodie.insert.shuffle.parallelism" -> "1",
+    "hoodie.upsert.shuffle.parallelism" -> "1",
+    DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+    DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id",
+    DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "name",
+    DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
"org.apache.hudi.keygen.SimpleKeyGenerator",
+    HoodieMetadataConfig.ENABLE.key -> "false"
+  )
+
+  /**
+   * Setup method running before each test.
+   */
+  @BeforeEach override def setUp(): Unit = {
+    setTableName("hoodie_avro_schema_resolution_support")
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+  }
+
+  @AfterEach override def tearDown(): Unit = {
+    cleanupSparkContexts()
+  }
+
+  def castColToX(x: Int, colToCast: String, df: DataFrame): DataFrame = x 
match {
+    case 0 => df.withColumn(colToCast, df.col(colToCast).cast("long"))
+    case 1 => df.withColumn(colToCast, df.col(colToCast).cast("float"))
+    case 2 => df.withColumn(colToCast, df.col(colToCast).cast("double"))
+    case 3 => df.withColumn(colToCast, df.col(colToCast).cast("binary"))
+    case 4 => df.withColumn(colToCast, df.col(colToCast).cast("string"))
+  }
+
+  def initialiseTable(df: DataFrame, saveDir: String, isCow: Boolean = true): 
Unit = {
+    val opts = if (isCow) {
+      commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
"COPY_ON_WRITE")
+    } else {
+      commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
"MERGE_ON_READ")
+    }
+
+    df.write.format("hudi")
+      .options(opts)
+      .mode("overwrite")
+      .save(saveDir)
+  }
+
+  def upsertData(df: DataFrame, saveDir: String, isCow: Boolean = true): Unit 
= {
+    val opts = if (isCow) {
+      commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
"COPY_ON_WRITE")
+    } else {
+      commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
"MERGE_ON_READ")
+    }
+
+    df.write.format("hudi")
+      .options(opts)
+      .mode("append")
+      .save(saveDir)
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testDataTypePromotions(isCow: Boolean): Unit = {
+    // test to read tables with columns that are promoted via avro schema 
resolution
+    val tempRecordPath = basePath + "/record_tbl/"
+    val _spark = spark
+    import _spark.implicits._
+
+    val colToCast = "userId"
+    val df1 = Seq((1, 100, "aaa")).toDF("id", "userid", "name")
+    val df2 = Seq((2, 200L, "bbb")).toDF("id", "userid", "name")
+
+    def prepDataFrame(df: DataFrame, colInitType: String): DataFrame = {
+      // convert int to string first before conversion to binary
+      // after which, initialise df with initType
+      if (colInitType == "binary") {
+        val castDf = df.withColumn(colToCast, df.col(colToCast).cast("string"))
+        castDf.withColumn(colToCast, castDf.col(colToCast).cast(colInitType))
+      } else {
+        df.withColumn(colToCast, df.col(colToCast).cast(colInitType))
+      }
+    }
+
+    def doTest(colInitType: String, start: Int, end: Int): Unit = {
+      for (a <- Range(start, end)) {
+        try {
+          Console.println(s"Performing test: $a with initialColType of: 
$colInitType")
+
+          // convert int to string first before conversion to binary
+          val initDF = prepDataFrame(df1, colInitType)
+          initDF.printSchema()
+          initDF.show(false)
+
+          // recreate table
+          initialiseTable(initDF, tempRecordPath, isCow)
+
+          // perform avro supported casting
+          var upsertDf = prepDataFrame(df2, colInitType)
+          upsertDf = castColToX(a, colToCast, upsertDf)
+          upsertDf.printSchema()
+          upsertDf.show(false)
+
+          // upsert
+          upsertData(upsertDf, tempRecordPath, isCow)
+
+          // read out the table
+          val readDf = spark.read.format("hudi").load(tempRecordPath)
+          readDf.printSchema()
+          readDf.show(false)
+          readDf.foreach(_ => {})
+
+          assert(true)
+        } catch {
+          case e: Exception => {
+            // e.printStackTrace()
+            // Console.println(s"Test $a failed with error: ${e.getMessage}")
+            assert(false, e)
+          }
+        }
+      }
+    }
+
+    // INT -> [Long, Float, Double, String]
+    doTest("int", 0, 3)
+    // Long -> [Float, Double, String]
+    doTest("long", 1, 3)
+    // Float -> [Double, String]
+    doTest("float", 2, 3)
+    // String -> [Bytes]
+    doTest("string", 3, 4)
+    // Bytes -> [String]
+    doTest("binary", 4, 5)
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testAddNewColumn(isCow: Boolean): Unit = {
+    // test to add a column
+    val tempRecordPath = basePath + "/record_tbl/"
+    val _spark = spark
+    import _spark.implicits._
+
+    val df1 = Seq((1, 100, "aaa")).toDF("id", "userid", "name")
+    val df2 = Seq((2, 200, "newCol", "bbb")).toDF("id", "userid", "newcol", 
"name")
+
+    // convert int to string first before conversion to binary
+    val initDF = df1
+    initDF.printSchema()
+    initDF.show(false)
+
+    // recreate table
+    initialiseTable(initDF, tempRecordPath, isCow)
+
+    // perform avro supported operation of adding a new column at the end of 
the table
+    val upsertDf = df2
+    upsertDf.printSchema()
+    upsertDf.show(false)
+
+    // upsert
+    upsertData(upsertDf, tempRecordPath, isCow)
+
+    // read out the table
+    val readDf = spark.read.format("hudi").load(tempRecordPath)
+    readDf.printSchema()
+    readDf.show(false)
+    readDf.foreach(_ => {})
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testDeleteColumn(isCow: Boolean): Unit = {
+    // test to delete a column
+    val tempRecordPath = basePath + "/record_tbl/"
+    val _spark = spark
+    import _spark.implicits._
+
+    val df1 = Seq((1, 100, "aaa")).toDF("id", "userid", "name")
+    val df2 = Seq((2, "bbb")).toDF("id", "name")
+
+    // convert int to string first before conversion to binary
+    val initDF = df1
+    initDF.printSchema()
+    initDF.show(false)
+
+    // recreate table
+    initialiseTable(initDF, tempRecordPath, isCow)
+
+    // perform avro supported operation of deleting a column
+    val upsertDf = df2
+    upsertDf.printSchema()
+    upsertDf.show(false)
+
+    // upsert
+    upsertData(upsertDf, tempRecordPath, isCow)
+
+    // read out the table
+    val readDf = spark.read.format("hudi").load(tempRecordPath)
+    readDf.printSchema()
+    readDf.show(false)
+    readDf.foreach(_ => {})
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testColumnPositionChange(isCow: Boolean): Unit = {
+    // test to change column positions
+    val tempRecordPath = basePath + "/record_tbl/"
+    val _spark = spark
+    import _spark.implicits._
+
+    val df1 = Seq((1, 100, "col1", "aaa")).toDF("id", "userid", "newcol", 
"name")
+    val df2 = Seq((2, "col2", 200, "bbb")).toDF("id", "newcol", "userid", 
"name")
+
+    // convert int to string first before conversion to binary
+    val initDF = df1
+    initDF.printSchema()
+    initDF.show(false)
+
+    // recreate table
+    initialiseTable(initDF, tempRecordPath, isCow)
+
+    // perform avro supported operation of deleting a column
+    val upsertDf = df2
+    upsertDf.printSchema()
+    upsertDf.show(false)
+
+    // upsert
+    upsertData(upsertDf, tempRecordPath, isCow)
+
+    // read out the table
+    val readDf = spark.read.format("hudi").load(tempRecordPath)
+    readDf.printSchema()
+    readDf.show(false)
+    readDf.foreach(_ => {})
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testArrayOfStructsAddNewColumn(isCow: Boolean): Unit = {
+    // test to add a field to a STRUCT in a column of ARRAY< STRUCT<..> > type
+
+    // there is a bug on Spark3 that will prevent Array[Map/Struct] schema 
evolved tables form being read
+    // bug fix: 
https://github.com/apache/spark/commit/32a393395ee43b573ae75afba591b587ca51879b
+    // bug fix is only available Spark >= v3.1.3
+    if (HoodieSparkUtils.isSpark2 || (HoodieSparkUtils.isSpark3 && 
HoodieSparkUtils.gteqSpark3_1_3)) {
+      val tempRecordPath = basePath + "/record_tbl/"
+      val arrayStructData = Seq(
+        Row(1, 100, List(Row("Java", "XX", 120), Row("Scala", "XA", 300)), 
"aaa")
+      )
+      val arrayStructSchema = new StructType()
+        .add("id", IntegerType)
+        .add("userid", IntegerType)
+        .add("language", ArrayType(new StructType()
+          .add("name", StringType)
+          .add("author", StringType)
+          .add("pages", IntegerType)))
+        .add("name", StringType)
+      val df1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData), 
arrayStructSchema)
+      df1.printSchema()
+      df1.show(false)
+
+      // recreate table
+      initialiseTable(df1, tempRecordPath, isCow)
+
+      // add a column to array of struct
+      val newArrayStructData = Seq(
+        Row(2, 200, List(Row("JavaV2", "XXX", 130, 20), Row("ScalaV2", "XXA", 
310, 40)), "bbb")
+      )
+      val newArrayStructSchema = new StructType()
+        .add("id", IntegerType)
+        .add("userid", IntegerType)
+        .add("language", ArrayType(new StructType()
+          .add("name", StringType)
+          .add("author", StringType)
+          .add("pages", IntegerType)
+          .add("progress", IntegerType)
+        ))
+        .add("name", StringType)
+      val df2 = 
spark.createDataFrame(spark.sparkContext.parallelize(newArrayStructData), 
newArrayStructSchema)
+      df2.printSchema()
+      df2.show(false)
+      // upsert
+      upsertData(df2, tempRecordPath, isCow)
+
+      // read out the table
+      val readDf = spark.read.format("hudi").load(tempRecordPath)
+      readDf.printSchema()
+      readDf.show(false)
+      readDf.foreach(_ => {})
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testArrayOfStructsChangeColumnType(isCow: Boolean): Unit = {
+    // test to change the type of a field from a STRUCT in a column of ARRAY< 
STRUCT<..> > type
+    val tempRecordPath = basePath + "/record_tbl/"
+    val arrayStructData = Seq(
+      Row(1, 100, List(Row("Java", "XX", 120), Row("Scala", "XA", 300)), "aaa")
+    )
+    val arrayStructSchema = new StructType()
+      .add("id", IntegerType)
+      .add("userid", IntegerType)
+      .add("language", ArrayType(new StructType()
+        .add("name", StringType)
+        .add("author", StringType)
+        .add("pages", IntegerType)))
+      .add("name", StringType)
+    val df1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData), 
arrayStructSchema)
+    df1.printSchema()
+    df1.show(false)
+
+    // recreate table
+    initialiseTable(df1, tempRecordPath, isCow)
+
+    // add a column to array of struct
+    val newArrayStructData = Seq(
+      Row(2, 200, List(Row("XXX", "JavaV2", 130L), Row("XXA", "ScalaV2", 
310L)), "bbb")
+    )
+    val newArrayStructSchema = new StructType()
+      .add("id", IntegerType)
+      .add("userid", IntegerType)
+      .add("language", ArrayType(new StructType()
+        .add("author", StringType)
+        .add("name", StringType)
+        .add("pages", LongType)))
+      .add("name", StringType)
+    val df2 = 
spark.createDataFrame(spark.sparkContext.parallelize(newArrayStructData), 
newArrayStructSchema)
+    df2.printSchema()
+    df2.show(false)
+    // upsert
+    upsertData(df2, tempRecordPath, isCow)
+
+    // read out the table
+    val readDf = spark.read.format("hudi").load(tempRecordPath)
+    readDf.printSchema()
+    readDf.show(false)
+    readDf.foreach(_ => {})
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testArrayOfStructsChangeColumnPosition(isCow: Boolean): Unit = {
+    // test to change the position of a field from a STRUCT in a column of 
ARRAY< STRUCT<..> > type
+    val tempRecordPath = basePath + "/record_tbl/"
+    val arrayStructData = Seq(
+      Row(1, 100, List(Row("Java", "XX", 120), Row("Scala", "XA", 300)), "aaa")
+    )
+    val arrayStructSchema = new StructType()
+      .add("id", IntegerType)
+      .add("userid", IntegerType)
+      .add("language", ArrayType(new StructType()
+        .add("name", StringType)
+        .add("author", StringType)
+        .add("pages", IntegerType)))
+      .add("name", StringType)
+    val df1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData), 
arrayStructSchema)
+    df1.printSchema()
+    df1.show(false)
+
+    // recreate table
+    initialiseTable(df1, tempRecordPath, isCow)
+
+    // add a column to array of struct
+    val newArrayStructData = Seq(
+      Row(2, 200, List(Row(130, "JavaV2", "XXX"), Row(310, "ScalaV2", "XXA")), 
"bbb")
+    )
+    val newArrayStructSchema = new StructType()
+      .add("id", IntegerType)
+      .add("userid", IntegerType)
+      .add("language", ArrayType(new StructType()
+        .add("pages", IntegerType)
+        .add("name", StringType)
+        .add("author", StringType)))
+      .add("name", StringType)
+    val df2 = 
spark.createDataFrame(spark.sparkContext.parallelize(newArrayStructData), 
newArrayStructSchema)
+    df2.printSchema()
+    df2.show(false)
+    // upsert
+    upsertData(df2, tempRecordPath, isCow)
+
+    // read out the table
+    val readDf = spark.read.format("hudi").load(tempRecordPath)
+    readDf.printSchema()
+    readDf.show(false)
+    readDf.foreach(_ => {})
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testArrayOfMapsChangeValueType(isCow: Boolean): Unit = {
+    // test to change the value type of a MAP in a column of ARRAY< MAP<k,v> > 
type
+    val tempRecordPath = basePath + "/record_tbl/"
+    val arrayMapData = Seq(
+      Row(1, 100, List(Map("2022-12-01" -> 120), Map("2022-12-02" -> 130)), 
"aaa")
+    )
+    val arrayMapSchema = new StructType()
+      .add("id", IntegerType)
+      .add("userid", IntegerType)
+      .add("salesMap", ArrayType(
+        new MapType(StringType, IntegerType, true)))
+      .add("name", StringType)
+    val df1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayMapData), 
arrayMapSchema)
+    df1.printSchema()
+    df1.show(false)
+
+    // recreate table
+    initialiseTable(df1, tempRecordPath, isCow)
+
+    // change value type from integer to long
+    val newArrayMapData = Seq(
+      Row(2, 200, List(Map("2022-12-01" -> 220L), Map("2022-12-02" -> 230L)), 
"bbb")
+    )
+    val newArrayMapSchema = new StructType()
+      .add("id", IntegerType)
+      .add("userid", IntegerType)
+      .add("salesMap", ArrayType(
+        new MapType(StringType, LongType, true)))
+      .add("name", StringType)
+    val df2 = 
spark.createDataFrame(spark.sparkContext.parallelize(newArrayMapData), 
newArrayMapSchema)
+    df2.printSchema()
+    df2.show(false)
+    // upsert
+    upsertData(df2, tempRecordPath, isCow)
+
+    // read out the table
+    val readDf = spark.read.format("hudi").load(tempRecordPath)
+    readDf.printSchema()
+    readDf.show(false)
+    readDf.foreach(_ => {})
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testArrayOfMapsStructChangeFieldType(isCow: Boolean): Unit = {
+    // test to change a field type of a STRUCT in a column of ARRAY< MAP< 
k,STRUCT<..> > > type
+    val tempRecordPath = basePath + "/record_tbl/"
+    val arrayMapData = Seq(
+      Row(1, 100,
+        List(
+          Map("2022-12-01" -> Row("a1", "b1", 20)),
+          Map("2022-12-02" -> Row("a2", "b2", 30))
+        ),
+        "aaa")
+    )
+    val innerStructSchema = new StructType()
+      .add("col1", StringType)
+      .add("col2", StringType)
+      .add("col3", IntegerType)
+    val arrayMapSchema = new StructType()
+      .add("id", IntegerType)
+      .add("userid", IntegerType)
+      .add("structcol", ArrayType(
+        new MapType(StringType, innerStructSchema, true)))
+      .add("name", StringType)
+    val df1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayMapData), 
arrayMapSchema)
+    df1.printSchema()
+    df1.show(false)
+
+    // recreate table
+    initialiseTable(df1, tempRecordPath, isCow)
+
+    // change inner struct's type from integer to long
+    val newArrayMapData = Seq(
+      Row(2, 200,
+        List(
+          Map("2022-12-03" -> Row("a3", "b3", 40L)),
+          Map("2022-12-04" -> Row("a4", "b4", 50L))
+        ),
+        "bbb")
+    )
+    val newInnerStructSchema = new StructType()
+      .add("col1", StringType)
+      .add("col2", StringType)
+      .add("col3", LongType)
+    val newArrayMapSchema = new StructType()
+      .add("id", IntegerType)
+      .add("userid", IntegerType)
+      .add("structcol", ArrayType(
+        new MapType(StringType, newInnerStructSchema, true)))
+      .add("name", StringType)
+    val df2 = 
spark.createDataFrame(spark.sparkContext.parallelize(newArrayMapData), 
newArrayMapSchema)
+    df2.printSchema()
+    df2.show(false)
+    // upsert
+    upsertData(df2, tempRecordPath, isCow)
+
+    // read out the table
+    val readDf = spark.read.format("hudi").load(tempRecordPath)
+    readDf.printSchema()
+    readDf.show(false)
+    readDf.foreach(_ => {})
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testArrayOfMapsStructAddField(isCow: Boolean): Unit = {
+    // test to add a field to a STRUCT in a column of ARRAY< MAP< k,STRUCT<..> 
> > type
+
+    // there is a bug on Spark3 that will prevent Array[Map/Struct] schema 
evolved tables form being read
+    // bug fix: 
https://github.com/apache/spark/commit/32a393395ee43b573ae75afba591b587ca51879b
+    // bug fix is only available Spark >= v3.1.3
+    if (HoodieSparkUtils.isSpark2 || (HoodieSparkUtils.isSpark3 && 
HoodieSparkUtils.gteqSpark3_1_3)) {
+      val tempRecordPath = basePath + "/record_tbl/"
+      val arrayMapData = Seq(
+        Row(1, 100,
+          List(
+            Map("2022-12-01" -> Row("a1", "b1", 20)),
+            Map("2022-12-02" -> Row("a2", "b2", 30))
+          ),
+          "aaa")
+      )
+      val innerStructSchema = new StructType()
+        .add("col1", StringType)
+        .add("col2", StringType)
+        .add("col3", IntegerType)
+      val arrayMapSchema = new StructType()
+        .add("id", IntegerType)
+        .add("userid", IntegerType)
+        .add("structcol", ArrayType(
+          new MapType(StringType, innerStructSchema, true)))
+        .add("name", StringType)
+      val df1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayMapData), 
arrayMapSchema)
+      df1.printSchema()
+      df1.show(false)
+
+      // recreate table
+      initialiseTable(df1, tempRecordPath, isCow)
+
+      // add a new column
+      val newArrayMapData = Seq(
+        Row(2, 200,
+          List(
+            Map("2022-12-01" -> Row("a3", "b3", 20, 40)),
+            Map("2022-12-02" -> Row("a4", "b4", 30, 40))
+          ),
+          "bbb")
+      )
+      val newInnerStructSchema = new StructType()
+        .add("col1", StringType)
+        .add("col2", StringType)
+        .add("col3", IntegerType)
+        .add("col4", IntegerType)
+      val newArrayMapSchema = new StructType()
+        .add("id", IntegerType)
+        .add("userid", IntegerType)
+        .add("structcol", ArrayType(
+          new MapType(StringType, newInnerStructSchema, true)))
+        .add("name", StringType)
+      val df2 = 
spark.createDataFrame(spark.sparkContext.parallelize(newArrayMapData), 
newArrayMapSchema)
+      df2.printSchema()
+      df2.show(false)
+      // upsert
+      upsertData(df2, tempRecordPath, isCow)
+
+      // read out the table
+      val readDf = spark.read.format("hudi").load(tempRecordPath)
+      readDf.printSchema()
+      readDf.show(false)
+      readDf.foreach(_ => {})
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testArrayOfMapsStructChangeFieldPosition(isCow: Boolean): Unit = {
+    // test to change the position of fields of a STRUCT in a column of ARRAY< 
MAP< k,STRUCT<..> > > type
+    val tempRecordPath = basePath + "/record_tbl/"
+    val arrayMapData = Seq(
+      Row(1, 100,
+        List(
+          Map("2022-12-01" -> Row("a1", "b1", 20)),
+          Map("2022-12-02" -> Row("a2", "b2", 30))
+        ),
+        "aaa")
+    )
+    val innerStructSchema = new StructType()
+      .add("col1", StringType)
+      .add("col2", StringType)
+      .add("col3", IntegerType)
+    val arrayMapSchema = new StructType()
+      .add("id", IntegerType)
+      .add("userid", IntegerType)
+      .add("structcol", ArrayType(
+        new MapType(StringType, innerStructSchema, true)))
+      .add("name", StringType)
+    val df1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayMapData), 
arrayMapSchema)
+    df1.printSchema()
+    df1.show(false)
+
+    // recreate table
+    initialiseTable(df1, tempRecordPath, isCow)
+
+    // change column position
+    val newArrayMapData = Seq(
+      Row(2, 200,
+        List(
+          Map("2022-12-01" -> Row("a3", 40, "b3")),
+          Map("2022-12-02" -> Row("a4", 50, "b4"))
+        ),
+        "bbb")
+    )
+    val newInnerStructSchema = new StructType()
+      .add("col1", StringType)
+      .add("col3", IntegerType)
+      .add("col2", StringType)
+    val newArrayMapSchema = new StructType()
+      .add("id", IntegerType)
+      .add("userid", IntegerType)
+      .add("structcol", ArrayType(
+        new MapType(StringType, newInnerStructSchema, true)))
+      .add("name", StringType)
+    val df2 = 
spark.createDataFrame(spark.sparkContext.parallelize(newArrayMapData), 
newArrayMapSchema)
+    df2.printSchema()
+    df2.show(false)
+    // upsert
+    upsertData(df2, tempRecordPath, isCow)
+
+    // read out the table
+    val readDf = spark.read.format("hudi").load(tempRecordPath)
+    readDf.printSchema()
+    readDf.show(false)
+    readDf.foreach(_ => {})
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testArrayOfMapsStructDeleteField(isCow: Boolean): Unit = {
+    // test to delete a field of a STRUCT in a column of ARRAY< MAP< 
k,STRUCT<..> > > type
+
+    val tempRecordPath = basePath + "/record_tbl/"
+    val arrayMapData = Seq(
+      Row(1, 100,
+        List(
+          Map("2022-12-01" -> Row("a1", "b1", 20)),
+          Map("2022-12-02" -> Row("a2", "b2", 30))
+        ),
+        "aaa")
+    )
+    val innerStructSchema = new StructType()
+      .add("col1", StringType)
+      .add("col2", StringType)
+      .add("col3", IntegerType)
+    val arrayMapSchema = new StructType()
+      .add("id", IntegerType)
+      .add("userid", IntegerType)
+      .add("structcol", ArrayType(
+        new MapType(StringType, innerStructSchema, true)))
+      .add("name", StringType)
+    val df1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayMapData), 
arrayMapSchema)
+    df1.printSchema()
+    df1.show(false)
+
+    // recreate table
+    initialiseTable(df1, tempRecordPath, isCow)
+
+    // change column position
+    val newArrayMapData = Seq(
+      Row(2, 200,
+        List(
+          Map("2022-12-01" -> Row("a3", 40)),
+          Map("2022-12-02" -> Row("a4", 50))
+        ),
+        "bbb")
+    )
+    val newInnerStructSchema = new StructType()
+      .add("col1", StringType)
+      .add("col3", IntegerType)
+    val newArrayMapSchema = new StructType()
+      .add("id", IntegerType)
+      .add("userid", IntegerType)
+      .add("structcol", ArrayType(
+        new MapType(StringType, newInnerStructSchema, true)))
+      .add("name", StringType)
+    val df2 = 
spark.createDataFrame(spark.sparkContext.parallelize(newArrayMapData), 
newArrayMapSchema)
+    df2.printSchema()
+    df2.show(false)
+    // upsert
+    upsertData(df2, tempRecordPath, isCow)
+
+    // read out the table
+    val readDf = spark.read.format("hudi").load(tempRecordPath)
+    readDf.printSchema()
+    readDf.show(false)
+    readDf.foreach(_ => {})
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testComplexOperationsOnTable(isCow: Boolean): Unit = {
+    // test a series of changes on a Hudi table
+
+    var defaultPartitionIdx = 0
+
+    def newPartition: String = {
+      defaultPartitionIdx = defaultPartitionIdx + 1
+      "aaa" + defaultPartitionIdx
+    }
+
+    val tempRecordPath = basePath + "/record_tbl/"
+    val _spark = spark
+    import _spark.implicits._
+
+    // 1. Initialise table
+    val df1 = Seq((1, 100, newPartition)).toDF("id", "userid", "name")
+    df1.printSchema()
+    df1.show(false)
+    initialiseTable(df1, tempRecordPath, isCow)
+
+    // 2. Promote INT type to LONG into a different partition
+    val df2 = Seq((2, 200L, newPartition)).toDF("id", "userid", "name")
+    df2.printSchema()
+    df2.show(false)
+    upsertData(df2, tempRecordPath, isCow)
+
+    // 3. Promote LONG to FLOAT
+    var df3 = Seq((3, 300, newPartition)).toDF("id", "userid", "name")
+    df3 = df3.withColumn("userid", df3.col("userid").cast("float"))
+    df3.printSchema()
+    df3.show(false)
+    upsertData(df3, tempRecordPath)
+
+    // 4. Promote FLOAT to DOUBLE
+    var df4 = Seq((4, 400, newPartition)).toDF("id", "userid", "name")
+    df4 = df4.withColumn("userid", df4.col("userid").cast("float"))
+    df4.printSchema()
+    df4.show(false)
+    upsertData(df4, tempRecordPath)
+
+    // 5. Add two new column
+    var df5 = Seq((5, 500, "newcol1", "newcol2", newPartition)).toDF("id", 
"userid", "newcol1", "newcol2", "name")
+    df5 = df5.withColumn("userid", df5.col("userid").cast("float"))
+    df5.printSchema()
+    df5.show(false)
+    upsertData(df5, tempRecordPath)
+
+    // 6. Delete a column
+    var df6 = Seq((6, 600, "newcol1", newPartition)).toDF("id", "userid", 
"newcol1", "name")
+    df6 = df6.withColumn("userid", df6.col("userid").cast("float"))
+    df6.printSchema()
+    df6.show(false)
+    upsertData(df6, tempRecordPath)
+
+    // 7. Rearrange column position
+    var df7 = Seq((7, "newcol1", 700, newPartition)).toDF("id", "newcol1", 
"userid", "name")
+    df7 = df7.withColumn("userid", df7.col("userid").cast("float"))
+    df7.printSchema()
+    df7.show(false)
+    upsertData(df7, tempRecordPath)
+
+    // read out the table
+    val readDf = spark.read.format("hudi").load(tempRecordPath)
+    readDf.printSchema()
+    readDf.show(false)
+    readDf.foreach(_ => {})
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieVectorizedParquetRecordReader.java
 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieVectorizedParquetRecordReader.java
new file mode 100644
index 0000000000..462a993580
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieVectorizedParquetRecordReader.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+public class Spark24HoodieVectorizedParquetRecordReader extends 
VectorizedParquetRecordReader {
+
+  // save the col type change info.
+  private Map<Integer, Pair<DataType, DataType>> typeChangeInfos;
+
+  private ColumnarBatch columnarBatch;
+
+  private Map<Integer, WritableColumnVector> idToColumnVectors;
+
+  private WritableColumnVector[] columnVectors;
+
+  // The capacity of vectorized batch.
+  private int capacity;
+
+  // If true, this class returns batches instead of rows.
+  private boolean returnColumnarBatch;
+
+  // The memory mode of the columnarBatch.
+  private final MemoryMode memoryMode;
+
+  /**
+   * Batch of rows that we assemble and the current index we've returned. 
Every time this
+   * batch is used up (batchIdx == numBatched), we populated the batch.
+   */
+  private int batchIdx = 0;
+  private int numBatched = 0;
+
+  public Spark24HoodieVectorizedParquetRecordReader(
+      TimeZone convertTz,
+      boolean useOffHeap,
+      int capacity,
+      Map<Integer, Pair<DataType, DataType>> typeChangeInfos) {
+    super(convertTz, useOffHeap, capacity);
+    memoryMode = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
+    this.typeChangeInfos = typeChangeInfos;
+    this.capacity = capacity;
+  }
+
+  @Override
+  public void initBatch(StructType partitionColumns, InternalRow 
partitionValues) {
+    super.initBatch(partitionColumns, partitionValues);
+    if (columnVectors == null) {
+      columnVectors = new WritableColumnVector[sparkSchema.length() + 
partitionColumns.length()];
+    }
+    if (idToColumnVectors == null) {
+      idToColumnVectors = new HashMap<>();
+      typeChangeInfos.entrySet()
+          .stream()
+          .forEach(f -> {
+            WritableColumnVector vector =
+                memoryMode == MemoryMode.OFF_HEAP ? new 
OffHeapColumnVector(capacity, f.getValue().getLeft()) : new 
OnHeapColumnVector(capacity, f.getValue().getLeft());
+            idToColumnVectors.put(f.getKey(), vector);
+          });
+    }
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext) throws IOException, InterruptedException, 
UnsupportedOperationException {
+    super.initialize(inputSplit, taskAttemptContext);
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    for (Map.Entry<Integer, WritableColumnVector> e : 
idToColumnVectors.entrySet()) {
+      e.getValue().close();
+    }
+    idToColumnVectors = null;
+    columnarBatch = null;
+    columnVectors = null;
+  }
+
+  @Override
+  public ColumnarBatch resultBatch() {
+    ColumnarBatch currentColumnBatch = super.resultBatch();
+    boolean changed = false;
+    for (Map.Entry<Integer, Pair<DataType, DataType>> entry : 
typeChangeInfos.entrySet()) {
+      boolean rewrite = SparkInternalSchemaConverter
+          .convertColumnVectorType((WritableColumnVector) 
currentColumnBatch.column(entry.getKey()),
+              idToColumnVectors.get(entry.getKey()), 
currentColumnBatch.numRows());
+      if (rewrite) {
+        changed = true;
+        columnVectors[entry.getKey()] = idToColumnVectors.get(entry.getKey());
+      }
+    }
+    if (changed) {
+      if (columnarBatch == null) {
+        // fill other vector
+        for (int i = 0; i < columnVectors.length; i++) {
+          if (columnVectors[i] == null) {
+            columnVectors[i] = (WritableColumnVector) 
currentColumnBatch.column(i);
+          }
+        }
+        columnarBatch = new ColumnarBatch(columnVectors);
+      }
+      columnarBatch.setNumRows(currentColumnBatch.numRows());
+      return columnarBatch;
+    } else {
+      return currentColumnBatch;
+    }
+  }
+
+  @Override
+  public boolean nextBatch() throws IOException {
+    boolean result = super.nextBatch();
+    if (idToColumnVectors != null) {
+      idToColumnVectors.entrySet().stream().forEach(e -> e.getValue().reset());
+    }
+    numBatched = resultBatch().numRows();
+    batchIdx = 0;
+    return result;
+  }
+
+  @Override
+  public void enableReturningBatches() {
+    returnColumnarBatch = true;
+    super.enableReturningBatches();
+  }
+
+  @Override
+  public Object getCurrentValue() {
+    if (typeChangeInfos == null || typeChangeInfos.isEmpty()) {
+      return super.getCurrentValue();
+    }
+
+    if (returnColumnarBatch) {
+      return columnarBatch == null ? super.getCurrentValue() : columnarBatch;
+    }
+
+    return columnarBatch == null ? super.getCurrentValue() : 
columnarBatch.getRow(batchIdx - 1);
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException {
+    resultBatch();
+
+    if (returnColumnarBatch)  {
+      return nextBatch();
+    }
+
+    if (batchIdx >= numBatched) {
+      if (!nextBatch()) {
+        return false;
+      }
+    }
+    ++batchIdx;
+    return true;
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
index 6fb5c50c03..1a8585b38a 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
@@ -31,12 +31,12 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.avro.AvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.{PartitionedFile, 
RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{AtomicType, StructType}
+import org.apache.spark.sql.types.{AtomicType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
 import java.net.URI
@@ -159,8 +159,17 @@ class Spark24HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
         }
 
       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
+
+      // Clone new conf
+      val hadoopAttemptConf = new 
Configuration(broadcastedHadoopConf.value.value)
+      val (implicitTypeChangeInfos, sparkRequestSchema) = 
HoodieParquetFileFormatHelper.buildImplicitSchemaChangeInfo(hadoopAttemptConf, 
footerFileMetaData, requiredSchema)
+
+      if (!implicitTypeChangeInfos.isEmpty) {
+        hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, 
sparkRequestSchema.json)
+      }
+
       val hadoopAttemptContext =
-        new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, 
attemptId)
+        new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
 
       // Try to push down filters when filter push-down is enabled.
       // Notice: This push-down is RowGroups level, not individual records.
@@ -169,17 +178,29 @@ class Spark24HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
       }
       val taskContext = Option(TaskContext.get())
       if (enableVectorizedReader) {
-        val vectorizedReader = new VectorizedParquetRecordReader(
-          convertTz.orNull, enableOffHeapColumnVector && 
taskContext.isDefined, capacity)
+        val vectorizedReader = if (!implicitTypeChangeInfos.isEmpty) {
+          new Spark24HoodieVectorizedParquetRecordReader(
+            convertTz.orNull,
+            enableOffHeapColumnVector && taskContext.isDefined,
+            capacity,
+            implicitTypeChangeInfos
+          )
+        } else {
+          new VectorizedParquetRecordReader(
+            convertTz.orNull,
+            enableOffHeapColumnVector && taskContext.isDefined,
+            capacity)
+        }
+
         val iter = new RecordReaderIterator(vectorizedReader)
         // SPARK-23457 Register a task completion lister before 
`initialization`.
         taskContext.foreach(_.addTaskCompletionListener[Unit](_ => 
iter.close()))
         vectorizedReader.initialize(split, hadoopAttemptContext)
-        logDebug(s"Appending $partitionSchema ${file.partitionValues}")
 
         // NOTE: We're making appending of the partitioned values to the rows 
read from the
         //       data file configurable
         if (shouldAppendPartitionValues) {
+          logDebug(s"Appending $partitionSchema ${file.partitionValues}")
           vectorizedReader.initBatch(partitionSchema, file.partitionValues)
         } else {
           vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
@@ -194,11 +215,12 @@ class Spark24HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
       } else {
         logDebug(s"Falling back to parquet-mr")
         // ParquetRecordReader returns UnsafeRow
+        val readSupport = new ParquetReadSupport(convertTz)
         val reader = if (pushed.isDefined && enableRecordFilter) {
           val parquetFilter = FilterCompat.get(pushed.get, null)
-          new ParquetRecordReader[UnsafeRow](new 
ParquetReadSupport(convertTz), parquetFilter)
+          new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter)
         } else {
-          new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
+          new ParquetRecordReader[UnsafeRow](readSupport)
         }
         val iter = new RecordReaderIterator(reader)
         // SPARK-23457 Register a task completion lister before 
`initialization`.
@@ -206,8 +228,21 @@ class Spark24HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
         reader.initialize(split, hadoopAttemptContext)
 
         val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
-        val joinedRow = new JoinedRow()
-        val appendPartitionColumns = 
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+        val unsafeProjection = if (implicitTypeChangeInfos.isEmpty) {
+          GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+        } else {
+          val newFullSchema = new 
StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) =>
+            if (implicitTypeChangeInfos.containsKey(i)) {
+              StructField(f.name, implicitTypeChangeInfos.get(i).getRight, 
f.nullable, f.metadata)
+            } else f
+          }).toAttributes ++ partitionSchema.toAttributes
+          val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
+            if (implicitTypeChangeInfos.containsKey(i)) {
+              Cast(attr, implicitTypeChangeInfos.get(i).getLeft)
+            } else attr
+          }
+          GenerateUnsafeProjection.generate(castSchema, newFullSchema)
+        }
 
         // This is a horrible erasure hack...  if we type the iterator above, 
then it actually check
         // the type in next() and we get a class cast exception.  If we make 
that function return
@@ -217,13 +252,14 @@ class Spark24HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
         //       data file configurable
         if (!shouldAppendPartitionValues || partitionSchema.length == 0) {
           // There is no partition columns
-          iter.asInstanceOf[Iterator[InternalRow]]
+          iter.asInstanceOf[Iterator[InternalRow]].map(unsafeProjection)
         } else {
+          val joinedRow = new JoinedRow()
           iter.asInstanceOf[Iterator[InternalRow]]
-            .map(d => appendPartitionColumns(joinedRow(d, 
file.partitionValues)))
+            .map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
         }
-
       }
     }
   }
+
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
index 1a1a44d858..9edd1321b1 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
@@ -151,7 +151,7 @@ class Spark31HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
       // Internal schema has to be pruned at this point
       val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
 
-      val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && 
querySchemaOption.isPresent
+      var shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && 
querySchemaOption.isPresent
 
       val tablePath = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
       val fileSchema = if (shouldUseInternalSchema) {
@@ -223,7 +223,8 @@ class Spark31HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
 
       // Clone new conf
       val hadoopAttemptConf = new 
Configuration(broadcastedHadoopConf.value.value)
-      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = 
if (shouldUseInternalSchema) {
+
+      val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = 
if (shouldUseInternalSchema) {
         val mergedInternalSchema = new InternalSchemaMerger(fileSchema, 
querySchemaOption.get(), true, true).mergeSchema()
         val mergedSchema = 
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
 
@@ -231,7 +232,12 @@ class Spark31HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
 
         
SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), 
mergedInternalSchema)
       } else {
-        new java.util.HashMap()
+        val (implicitTypeChangeInfo, sparkRequestSchema) = 
HoodieParquetFileFormatHelper.buildImplicitSchemaChangeInfo(hadoopAttemptConf, 
footerFileMetaData, requiredSchema)
+        if (!implicitTypeChangeInfo.isEmpty) {
+          shouldUseInternalSchema = true
+          hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, 
sparkRequestSchema.json)
+        }
+        implicitTypeChangeInfo
       }
 
       val hadoopAttemptContext =
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala
index 52d450029e..ae686d33a3 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala
@@ -146,7 +146,7 @@ class Spark32PlusHoodieParquetFileFormat(private val 
shouldAppendPartitionValues
       // Internal schema has to be pruned at this point
       val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
 
-      val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && 
querySchemaOption.isPresent
+      var shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && 
querySchemaOption.isPresent
 
       val tablePath = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
       val fileSchema = if (shouldUseInternalSchema) {
@@ -228,7 +228,12 @@ class Spark32PlusHoodieParquetFileFormat(private val 
shouldAppendPartitionValues
 
         
SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), 
mergedInternalSchema)
       } else {
-        new java.util.HashMap()
+        val (implicitTypeChangeInfo, sparkRequestSchema) = 
HoodieParquetFileFormatHelper.buildImplicitSchemaChangeInfo(hadoopAttemptConf, 
footerFileMetaData, requiredSchema)
+        if (!implicitTypeChangeInfo.isEmpty) {
+          shouldUseInternalSchema = true
+          hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, 
sparkRequestSchema.json)
+        }
+        implicitTypeChangeInfo
       }
 
       val hadoopAttemptContext =
@@ -394,7 +399,6 @@ class Spark32PlusHoodieParquetFileFormat(private val 
shouldAppendPartitionValues
       }
     }
   }
-
 }
 
 object Spark32PlusHoodieParquetFileFormat {

Reply via email to