This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 54700538ab2 [HUDI-8011] allow schema.on.read with positional merging 
(#11671)
54700538ab2 is described below

commit 54700538ab2fb7c0041b461fd16048713a09db22
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Jul 23 21:34:24 2024 -0400

    [HUDI-8011] allow schema.on.read with positional merging (#11671)
    
    * allow schema.on.read with positional merging
    
    * use full name so builds
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../SparkFileFormatInternalRowReaderContext.scala  |  3 ++-
 .../datasources/parquet/SparkParquetReader.scala   | 14 ++++++----
 .../read/HoodieFileGroupReaderSchemaHandler.java   | 18 +++++++++++--
 .../read/HoodiePositionBasedSchemaHandler.java     | 27 ++++++++++++++++++-
 .../hudi/internal/schema/InternalSchema.java       | 30 ++++++++++-----------
 ...odieFileGroupReaderBasedParquetFileFormat.scala |  6 ++---
 .../parquet/SparkParquetReaderBase.scala           | 31 +++++++++++++---------
 .../parquet/TestSparkParquetReaderFormat.scala     |  3 ++-
 .../TestSpark35RecordPositionMetadataColumn.scala  |  3 ++-
 .../datasources/parquet/Spark24ParquetReader.scala |  4 +++
 .../Spark3ParquetSchemaEvolutionUtils.scala        | 25 +++++++----------
 .../datasources/parquet/Spark30ParquetReader.scala | 18 ++++++++-----
 .../Spark30ParquetSchemaEvolutionUtils.scala       |  8 ++++--
 .../datasources/parquet/Spark31ParquetReader.scala | 20 +++++++++-----
 .../Spark31ParquetSchemaEvolutionUtils.scala       |  8 ++++--
 .../datasources/parquet/Spark32ParquetReader.scala | 18 ++++++++-----
 .../Spark32PlusParquetSchemaEvolutionUtils.scala   | 10 ++++---
 .../datasources/parquet/Spark33ParquetReader.scala | 17 +++++++-----
 .../datasources/parquet/Spark34ParquetReader.scala | 18 ++++++++-----
 .../datasources/parquet/Spark35ParquetReader.scala | 28 +++++++++++--------
 20 files changed, 200 insertions(+), 109 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 074e41b6fef..89e4b7b1040 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -103,7 +103,8 @@ class 
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
         .createPartitionedFile(InternalRow.empty, filePath, start, length)
       val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType, 
hasRowIndexField)
       new CloseableInternalRowIterator(parquetFileReader.read(fileInfo,
-        readSchema, StructType(Seq.empty), readFilters, 
storage.getConf.asInstanceOf[StorageConfiguration[Configuration]]))
+        readSchema, StructType(Seq.empty), 
getSchemaHandler.getInternalSchemaOpt,
+        readFilters, 
storage.getConf.asInstanceOf[StorageConfiguration[Configuration]]))
     }
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
index d1f6826a2e1..8188f1d0a19 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
@@ -19,6 +19,8 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.storage.StorageConfiguration
 
 import org.apache.hadoop.conf.Configuration
@@ -31,16 +33,18 @@ trait SparkParquetReader extends Serializable {
   /**
    * Read an individual parquet file
    *
-   * @param file            parquet file to read
-   * @param requiredSchema  desired output schema of the data
-   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
-   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
-   * @param storageConf     the hadoop conf
+   * @param file               parquet file to read
+   * @param requiredSchema     desired output schema of the data
+   * @param partitionSchema    schema of the partition columns. Partition 
values will be appended to the end of every row
+   * @param internalSchemaOpt  option of internal schema for schema.on.read
+   * @param filters            filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param storageConf        the hadoop conf
    * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
    */
   def read(file: PartitionedFile,
            requiredSchema: StructType,
            partitionSchema: StructType,
+           internalSchemaOpt: util.Option[InternalSchema],
            filters: Seq[Filter],
            storageConf: StorageConfiguration[Configuration]): 
Iterator[InternalRow]
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
index 8a962fdc2a7..46d2fdd0c5e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
@@ -58,6 +58,7 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
 
   protected final InternalSchema internalSchema;
 
+  protected final Option<InternalSchema> internalSchemaOpt;
 
   protected final HoodieTableConfig hoodieTableConfig;
 
@@ -84,6 +85,7 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
     this.hoodieTableConfig = hoodieTableConfig;
     this.requiredSchema = prepareRequiredSchema();
     this.internalSchema = pruneInternalSchema(requiredSchema, 
internalSchemaOpt);
+    this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
     readerContext.setNeedsBootstrapMerge(this.needsBootstrapMerge);
   }
 
@@ -103,6 +105,10 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
     return this.internalSchema;
   }
 
+  public Option<InternalSchema> getInternalSchemaOpt() {
+    return this.internalSchemaOpt;
+  }
+
   public Option<UnaryOperator<T>> getOutputConverter() {
     if (!requestedSchema.equals(requiredSchema)) {
       return Option.of(readerContext.projectRecord(requiredSchema, 
requestedSchema));
@@ -110,7 +116,7 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
     return Option.empty();
   }
 
-  private static InternalSchema pruneInternalSchema(Schema requiredSchema, 
Option<InternalSchema> internalSchemaOption) {
+  private InternalSchema pruneInternalSchema(Schema requiredSchema, 
Option<InternalSchema> internalSchemaOption) {
     if (!internalSchemaOption.isPresent()) {
       return InternalSchema.getEmptyInternalSchema();
     }
@@ -119,7 +125,15 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
       return InternalSchema.getEmptyInternalSchema();
     }
 
-    return 
AvroInternalSchemaConverter.pruneAvroSchemaToInternalSchema(requiredSchema, 
notPruned);
+    return doPruneInternalSchema(requiredSchema, notPruned);
+  }
+
+  protected Option<InternalSchema> getInternalSchemaOpt(Option<InternalSchema> 
internalSchemaOpt) {
+    return internalSchemaOpt;
+  }
+
+  protected InternalSchema doPruneInternalSchema(Schema requiredSchema, 
InternalSchema internalSchema) {
+    return 
AvroInternalSchemaConverter.pruneAvroSchemaToInternalSchema(requiredSchema, 
internalSchema);
   }
 
   private Schema generateRequiredSchema() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
index 87c4266e350..2336fb188be 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
@@ -24,6 +24,9 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.utils.SchemaChangeUtils;
 
 import org.apache.avro.Schema;
 
@@ -31,6 +34,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaDedupNested;
+import static 
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME;
 
 /**
  * This class is responsible for handling the schema for the file group reader 
that supports positional merge.
@@ -52,6 +56,27 @@ public class HoodiePositionBasedSchemaHandler<T> extends 
HoodieFileGroupReaderSc
         : preMergeSchema;
   }
 
+  @Override
+  protected Option<InternalSchema> getInternalSchemaOpt(Option<InternalSchema> 
internalSchemaOpt) {
+    return 
internalSchemaOpt.map(HoodiePositionBasedSchemaHandler::addPositionalMergeCol);
+  }
+
+  @Override
+  protected InternalSchema doPruneInternalSchema(Schema requiredSchema, 
InternalSchema internalSchema) {
+    if (!(readerContext.getShouldMergeUseRecordPosition() && 
readerContext.getHasLogFiles())) {
+      return super.doPruneInternalSchema(requiredSchema, internalSchema);
+    }
+
+    InternalSchema withRowIndex = addPositionalMergeCol(internalSchema);
+    return super.doPruneInternalSchema(requiredSchema, withRowIndex);
+  }
+
+  private static InternalSchema addPositionalMergeCol(InternalSchema 
internalSchema) {
+    TableChanges.ColumnAddChange addChange = 
TableChanges.ColumnAddChange.get(internalSchema);
+    addChange.addColumns("", ROW_INDEX_TEMPORARY_COLUMN_NAME, 
Types.LongType.get(), null);
+    return SchemaChangeUtils.applyTableChanges2Schema(internalSchema, 
addChange);
+  }
+
   @Override
   public Pair<List<Schema.Field>,List<Schema.Field>> 
getBootstrapRequiredFields() {
     Pair<List<Schema.Field>,List<Schema.Field>> dataAndMetaCols = 
super.getBootstrapRequiredFields();
@@ -69,7 +94,7 @@ public class HoodiePositionBasedSchemaHandler<T> extends 
HoodieFileGroupReaderSc
   }
 
   private static Schema.Field getPositionalMergeField() {
-    return new 
Schema.Field(HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME,
+    return new Schema.Field(ROW_INDEX_TEMPORARY_COLUMN_NAME,
         Schema.create(Schema.Type.LONG), "", -1L);
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java
index d1d36fcbc91..5df9cd6f22e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java
@@ -71,15 +71,13 @@ public class InternalSchema implements Serializable {
     this.maxColumnId = maxColumnId;
     this.versionId = versionId;
     this.record = recordType;
-    buildIdToName();
+    getIdToName();
   }
 
   public InternalSchema(long versionId, RecordType recordType) {
     this.versionId = versionId;
     this.record = recordType;
-    this.idToName = recordType.fields().isEmpty()
-        ? Collections.emptyMap()
-        : InternalSchemaBuilder.getBuilder().buildIdToName(record);
+    this.idToName = buildIdToName(record);
     this.nameToId = recordType.fields().isEmpty()
         ? Collections.emptyMap()
         : 
idToName.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, 
Map.Entry::getKey));
@@ -90,9 +88,15 @@ public class InternalSchema implements Serializable {
     return record;
   }
 
-  private Map<Integer, String> buildIdToName() {
+  private static Map<Integer, String> buildIdToName(RecordType record) {
+    return record.fields().isEmpty()
+        ? Collections.emptyMap()
+        : InternalSchemaBuilder.getBuilder().buildIdToName(record);
+  }
+
+  private Map<Integer, String> getIdToName() {
     if (idToName == null) {
-      idToName = InternalSchemaBuilder.getBuilder().buildIdToName(record);
+      idToName = buildIdToName(record);
     }
     return idToName;
   }
@@ -168,10 +172,7 @@ public class InternalSchema implements Serializable {
    * @return full name of field corresponding to id
    */
   public String findFullName(int id) {
-    if (idToName == null) {
-      buildIdToName();
-    }
-    String result = idToName.get(id);
+    String result = getIdToName().get(id);
     return result == null ? "" : result;
   }
 
@@ -210,10 +211,7 @@ public class InternalSchema implements Serializable {
    * Returns all field ids
    */
   public Set<Integer> getAllIds() {
-    if (idToName == null) {
-      buildIdToName();
-    }
-    return idToName.keySet();
+    return getIdToName().keySet();
   }
 
   /**
@@ -255,9 +253,9 @@ public class InternalSchema implements Serializable {
     if (caseSensitive) {
       // In case we do a case-sensitive check we just need to validate whether
       // schema contains field-name as it is
-      return idToName.containsValue(colName);
+      return getIdToName().containsValue(colName);
     } else {
-      return idToName.values()
+      return getIdToName().values()
           .stream()
           .map(fieldName -> fieldName.toLowerCase(Locale.ROOT))
           .collect(Collectors.toSet())
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 31404c3e786..9c9a4cca7be 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -29,7 +29,6 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.table.read.HoodieFileGroupReader
 import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.internal.schema.utils.SerDeHelper
 import org.apache.hudi.storage.StorageConfiguration
 import org.apache.hudi.storage.hadoop.{HadoopStorageConfiguration, 
HoodieHadoopStorage}
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
@@ -179,7 +178,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                   fileSliceMapping.getPartitionValues)
               }
 
-            case _ => parquetFileReader.value.read(file, requiredSchema, 
partitionSchema, filters, storageConf)
+            case _ => parquetFileReader.value.read(file, requiredSchema, 
partitionSchema, internalSchemaOpt, filters, storageConf)
           }
         // CDC queries.
         case hoodiePartitionCDCFileGroupSliceMapping: 
HoodiePartitionCDCFileGroupMapping =>
@@ -189,14 +188,13 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
             fileGroupSplit, cdcFileReader.asInstanceOf[PartitionedFile => 
Iterator[InternalRow]],
             storageConf, fileIndexProps, requiredSchema)
 
-        case _ => parquetFileReader.value.read(file, requiredSchema, 
partitionSchema, filters, storageConf)
+        case _ => parquetFileReader.value.read(file, requiredSchema, 
partitionSchema, internalSchemaOpt, filters, storageConf)
       }
     }
   }
 
   private def setSchemaEvolutionConfigs(conf: StorageConfiguration[_]): Unit = 
{
     if (internalSchemaOpt.isPresent) {
-      conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, 
SerDeHelper.toJson(internalSchemaOpt.get()))
       conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, 
tableState.tablePath)
       conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, 
validCommits)
     }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
index ddfa4ba6837..56738595c26 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
@@ -19,8 +19,12 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hudi.storage.StorageConfiguration
+
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.internal.SQLConf
@@ -43,41 +47,44 @@ abstract class 
SparkParquetReaderBase(enableVectorizedReader: Boolean,
   /**
    * Read an individual parquet file
    *
-   * @param file            parquet file to read
-   * @param requiredSchema  desired output schema of the data
-   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
-   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
-   * @param storageConf     the hadoop conf
+   * @param file               parquet file to read
+   * @param requiredSchema     desired output schema of the data
+   * @param partitionSchema    schema of the partition columns. Partition 
values will be appended to the end of every row
+   * @param internalSchemaOpt  option of internal schema for schema.on.read
+   * @param filters            filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param storageConf        the hadoop conf
    * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
    */
   final def read(file: PartitionedFile,
                  requiredSchema: StructType,
                  partitionSchema: StructType,
+                 internalSchemaOpt: util.Option[InternalSchema],
                  filters: Seq[Filter],
                  storageConf: StorageConfiguration[Configuration]): 
Iterator[InternalRow] = {
     val conf = storageConf.unwrapCopy()
     conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, 
requiredSchema.json)
     conf.set(ParquetWriteSupport.SPARK_ROW_SCHEMA, requiredSchema.json)
     ParquetWriteSupport.setSchema(requiredSchema, conf)
-    doRead(file, requiredSchema, partitionSchema, filters, conf)
+    doRead(file, requiredSchema, partitionSchema, internalSchemaOpt, filters, 
conf)
   }
 
   /**
    * Implemented for each spark version
    *
-   * @param file            parquet file to read
-   * @param requiredSchema  desired output schema of the data
-   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
-   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
-   * @param sharedConf      the hadoop conf
+   * @param file               parquet file to read
+   * @param requiredSchema     desired output schema of the data
+   * @param partitionSchema    schema of the partition columns. Partition 
values will be appended to the end of every row
+   * @param internalSchemaOpt  option of internal schema for schema.on.read
+   * @param filters            filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf         the hadoop conf
    * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
    */
   protected def doRead(file: PartitionedFile,
                        requiredSchema: StructType,
                        partitionSchema: StructType,
+                       internalSchemaOpt: util.Option[InternalSchema],
                        filters: Seq[Filter],
                        sharedConf: Configuration): Iterator[InternalRow]
-
 }
 
 trait SparkParquetReaderBuilder {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala
index be409e50534..e3c1f3cdefc 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala
@@ -21,6 +21,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.util
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -50,7 +51,7 @@ class TestSparkParquetReaderFormat extends ParquetFileFormat 
with SparkAdapterSu
 
     (file: PartitionedFile) => {
       //code inside the lambda will run on the executor
-      reader.read(file, requiredSchema, partitionSchema, filters, 
broadcastedStorageConf.value)
+      reader.read(file, requiredSchema, partitionSchema, util.Option.empty(), 
filters, broadcastedStorageConf.value)
     }
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
index 4b6196761ad..59dc89c5a21 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
@@ -22,6 +22,7 @@ package org.apache.hudi.common.table.read
 import org.apache.hudi.SparkAdapterSupport.sparkAdapter
 import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
 import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.util
 import org.apache.hudi.common.testutils.HoodieTestTable
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
@@ -112,7 +113,7 @@ class TestSpark35RecordPositionMetadataColumn extends 
SparkClientFunctionalTestH
           0,
           allBaseFiles.get(0).getLength)
       val iterator = new CloseableInternalRowIterator(reader.read(fileInfo, 
requiredSchema,
-        StructType(Seq.empty), Seq.empty, new 
HadoopStorageConfiguration(hadoopConf)))
+        StructType(Seq.empty), util.Option.empty(), Seq.empty, new 
HadoopStorageConfiguration(hadoopConf)))
       var rowIndices: Set[Long] = Set()
       while (iterator.hasNext) {
         val row = iterator.next()
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala
index 42808f337b7..a0ef97a2266 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala
@@ -18,6 +18,9 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.lib.input.FileSplit
@@ -81,6 +84,7 @@ class Spark24ParquetReader(enableVectorizedReader: Boolean,
   protected def doRead(file: PartitionedFile,
                        requiredSchema: StructType,
                        partitionSchema: StructType,
+                       internalSchemaOpt: 
org.apache.hudi.common.util.Option[InternalSchema],
                        filters: Seq[Filter],
                        sharedConf: Configuration): Iterator[InternalRow] = {
     assert(file.partitionValues.numFields == partitionSchema.size)
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala
index d2b2bfad3cf..4996a668af5 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala
@@ -25,11 +25,10 @@ import 
org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util
 import org.apache.hudi.common.util.InternalSchemaCache
-import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
-import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
 import org.apache.parquet.hadoop.metadata.FileMetaData
 import org.apache.spark.sql.HoodieSchemaUtils
@@ -45,13 +44,12 @@ import 
scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
 abstract class Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration,
                                                  filePath: Path,
                                                  requiredSchema: StructType,
-                                                 partitionSchema: StructType) 
extends SparkAdapterSupport{
+                                                 partitionSchema: StructType,
+                                                 internalSchemaOpt: 
util.Option[InternalSchema]) extends SparkAdapterSupport{
   // Fetch internal schema
-  private lazy val internalSchemaStr: String = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+  private lazy val querySchemaOption: util.Option[InternalSchema] = 
pruneInternalSchema(internalSchemaOpt, requiredSchema)
 
-  private lazy val querySchemaOption: util.Option[InternalSchema] = 
pruneInternalSchema(internalSchemaStr, requiredSchema)
-
-  var shouldUseInternalSchema: Boolean = !isNullOrEmpty(internalSchemaStr) && 
querySchemaOption.isPresent
+  var shouldUseInternalSchema: Boolean = querySchemaOption.isPresent
 
   private lazy val schemaUtils: HoodieSchemaUtils = sparkAdapter.getSchemaUtils
 
@@ -183,16 +181,11 @@ abstract class 
Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration,
 }
 
 object Spark3ParquetSchemaEvolutionUtils {
-  def pruneInternalSchema(internalSchemaStr: String, requiredSchema: 
StructType): util.Option[InternalSchema] = {
-    if (!isNullOrEmpty(internalSchemaStr) ) {
-      val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
-      if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {
-        
util.Option.of(SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema,
 querySchemaOption.get()))
-      } else {
-        querySchemaOption
-      }
+  def pruneInternalSchema(internalSchemaOpt:  util.Option[InternalSchema], 
requiredSchema: StructType): util.Option[InternalSchema] = {
+    if (internalSchemaOpt.isPresent && requiredSchema.nonEmpty) {
+      
util.Option.of(SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema,
 internalSchemaOpt.get()))
     } else {
-      util.Option.empty()
+      internalSchemaOpt
     }
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala
index a6e9a3c5fe0..22042c964bc 100644
--- 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala
@@ -19,6 +19,9 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
@@ -70,16 +73,18 @@ class Spark30ParquetReader(enableVectorizedReader: Boolean,
    * Read an individual parquet file
    * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark 
v3.0.3 adapted here
    *
-   * @param file            parquet file to read
-   * @param requiredSchema  desired output schema of the data
-   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
-   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
-   * @param sharedConf      the hadoop conf
+   * @param file               parquet file to read
+   * @param requiredSchema     desired output schema of the data
+   * @param partitionSchema    schema of the partition columns. Partition 
values will be appended to the end of every row
+   * @param internalSchemaOpt  option of internal schema for schema.on.read
+   * @param filters            filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf         the hadoop conf
    * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
    */
   protected def doRead(file: PartitionedFile,
                        requiredSchema: StructType,
                        partitionSchema: StructType,
+                       internalSchemaOpt: 
org.apache.hudi.common.util.Option[InternalSchema],
                        filters: Seq[Filter],
                        sharedConf: Configuration): Iterator[InternalRow] = {
     assert(file.partitionValues.numFields == partitionSchema.size)
@@ -94,7 +99,8 @@ class Spark30ParquetReader(enableVectorizedReader: Boolean,
         Array.empty,
         null)
 
-    val schemaEvolutionUtils = new 
Spark30ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema, 
partitionSchema)
+    val schemaEvolutionUtils = new 
Spark30ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
+      partitionSchema, internalSchemaOpt)
 
     lazy val footerFileMetaData =
       ParquetFileReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
diff --git 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetSchemaEvolutionUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetSchemaEvolutionUtils.scala
index 28acb6e0474..2fccf6a7a72 100644
--- 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetSchemaEvolutionUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetSchemaEvolutionUtils.scala
@@ -19,6 +19,9 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import 
org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils
@@ -29,8 +32,9 @@ import java.time.ZoneId
 class Spark30ParquetSchemaEvolutionUtils(sharedConf: Configuration,
                                          filePath: Path,
                                          requiredSchema: StructType,
-                                         partitionSchema: StructType) extends
-  Spark3ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema, 
partitionSchema) {
+                                         partitionSchema: StructType,
+                                         internalSchemaOpt: 
util.Option[InternalSchema]) extends
+  Spark3ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema, 
partitionSchema, internalSchemaOpt) {
 
   def buildVectorizedReader(convertTz: ZoneId,
                             datetimeRebaseMode: String,
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala
index a3e179fa184..bd86f57eefb 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala
@@ -19,6 +19,9 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
@@ -71,16 +74,18 @@ class Spark31ParquetReader(enableVectorizedReader: Boolean,
    * Read an individual parquet file
    * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark 
v3.1.3 adapted here
    *
-   * @param file            parquet file to read
-   * @param requiredSchema  desired output schema of the data
-   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
-   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
-   * @param sharedConf      the hadoop conf
+   * @param file               parquet file to read
+   * @param requiredSchema     desired output schema of the data
+   * @param partitionSchema    schema of the partition columns. Partition 
values will be appended to the end of every row
+   * @param internalSchemaOpt  option of internal schema for schema.on.read
+   * @param filters            filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf         the hadoop conf
    * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
    */
   protected def doRead(file: PartitionedFile,
-                        requiredSchema: StructType,
+                       requiredSchema: StructType,
                        partitionSchema: StructType,
+                       internalSchemaOpt: 
org.apache.hudi.common.util.Option[InternalSchema],
                        filters: Seq[Filter],
                        sharedConf: Configuration): Iterator[InternalRow] = {
     assert(file.partitionValues.numFields == partitionSchema.size)
@@ -95,7 +100,8 @@ class Spark31ParquetReader(enableVectorizedReader: Boolean,
         Array.empty,
         null)
 
-    val schemaEvolutionUtils = new 
Spark31ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema, 
partitionSchema)
+    val schemaEvolutionUtils = new 
Spark31ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
+      partitionSchema, internalSchemaOpt)
 
     lazy val footerFileMetaData =
       ParquetFileReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetSchemaEvolutionUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetSchemaEvolutionUtils.scala
index d31efde4d29..a81d57924d7 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetSchemaEvolutionUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetSchemaEvolutionUtils.scala
@@ -19,6 +19,9 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import 
org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils
@@ -29,8 +32,9 @@ import java.time.ZoneId
 class Spark31ParquetSchemaEvolutionUtils(sharedConf: Configuration,
                                          filePath: Path,
                                          requiredSchema: StructType,
-                                         partitionSchema: StructType) extends
-  Spark3ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema, 
partitionSchema) {
+                                         partitionSchema: StructType,
+                                         internalSchemaOpt: 
util.Option[InternalSchema]) extends
+  Spark3ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema, 
partitionSchema, internalSchemaOpt) {
 
   def buildVectorizedReader(convertTz: ZoneId,
                             datetimeRebaseMode: String,
diff --git 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala
index e2189e5cbdf..6b723702244 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala
@@ -19,6 +19,9 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.FileSplit
@@ -74,16 +77,18 @@ class Spark32ParquetReader(enableVectorizedReader: Boolean,
    * Read an individual parquet file
    * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark 
v3.2.4 adapted here
    *
-   * @param file            parquet file to read
-   * @param requiredSchema  desired output schema of the data
-   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
-   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
-   * @param sharedConf      the hadoop conf
+   * @param file               parquet file to read
+   * @param requiredSchema     desired output schema of the data
+   * @param partitionSchema    schema of the partition columns. Partition 
values will be appended to the end of every row
+   * @param internalSchemaOpt  option of internal schema for schema.on.read
+   * @param filters            filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf         the hadoop conf
    * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
    */
   protected def doRead(file: PartitionedFile,
                        requiredSchema: StructType,
                        partitionSchema: StructType,
+                       internalSchemaOpt: 
org.apache.hudi.common.util.Option[InternalSchema],
                        filters: Seq[Filter],
                        sharedConf: Configuration): Iterator[InternalRow] = {
     assert(file.partitionValues.numFields == partitionSchema.size)
@@ -91,7 +96,8 @@ class Spark32ParquetReader(enableVectorizedReader: Boolean,
     val filePath = new Path(new URI(file.filePath))
     val split = new FileSplit(filePath, file.start, file.length, 
Array.empty[String])
 
-    val schemaEvolutionUtils = new 
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema, 
partitionSchema)
+    val schemaEvolutionUtils = new 
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
+      partitionSchema, internalSchemaOpt)
 
     lazy val footerFileMetaData =
       ParquetFooterReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusParquetSchemaEvolutionUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusParquetSchemaEvolutionUtils.scala
index 05523bcd9f0..946c4a99d7d 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusParquetSchemaEvolutionUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusParquetSchemaEvolutionUtils.scala
@@ -18,10 +18,11 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.spark.TaskContext
-import org.apache.spark.sql.catalyst.util.RebaseDateTime
 import 
org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils
 import org.apache.spark.sql.types.StructType
 
@@ -30,8 +31,9 @@ import java.time.ZoneId
 class Spark32PlusParquetSchemaEvolutionUtils(sharedConf: Configuration,
                                              filePath: Path,
                                              requiredSchema: StructType,
-                                             partitionSchema: StructType) 
extends
-  Spark3ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema, 
partitionSchema) {
+                                             partitionSchema: StructType,
+                                             internalSchemaOpt: 
util.Option[InternalSchema]) extends
+  Spark3ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema, 
partitionSchema, internalSchemaOpt) {
 
   def buildVectorizedReader(convertTz: ZoneId,
                             datetimeRebaseMode: String,
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
index 8d882d7945d..8a7961fc978 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
@@ -19,6 +19,8 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -74,16 +76,18 @@ class Spark33ParquetReader(enableVectorizedReader: Boolean,
    * Read an individual parquet file
    * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark 
v3.3.4 adapted here
    *
-   * @param file            parquet file to read
-   * @param requiredSchema  desired output schema of the data
-   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
-   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
-   * @param sharedConf      the hadoop conf
+   * @param file               parquet file to read
+   * @param requiredSchema     desired output schema of the data
+   * @param partitionSchema    schema of the partition columns. Partition 
values will be appended to the end of every row
+   * @param internalSchemaOpt  option of internal schema for schema.on.read
+   * @param filters            filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf         the hadoop conf
    * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
    */
   protected def doRead(file: PartitionedFile,
                        requiredSchema: StructType,
                        partitionSchema: StructType,
+                       internalSchemaOpt: 
org.apache.hudi.common.util.Option[InternalSchema],
                        filters: Seq[Filter],
                        sharedConf: Configuration): Iterator[InternalRow] = {
     assert(file.partitionValues.numFields == partitionSchema.size)
@@ -91,7 +95,8 @@ class Spark33ParquetReader(enableVectorizedReader: Boolean,
     val filePath = new Path(new URI(file.filePath))
     val split = new FileSplit(filePath, file.start, file.length, 
Array.empty[String])
 
-    val schemaEvolutionUtils = new 
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema, 
partitionSchema)
+    val schemaEvolutionUtils = new 
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
+      partitionSchema, internalSchemaOpt)
 
     lazy val footerFileMetaData =
       ParquetFooterReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
index 77e90b82849..dadcb61173a 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
@@ -19,6 +19,9 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.FileSplit
 import org.apache.hadoop.mapreduce._
@@ -70,16 +73,18 @@ class Spark34ParquetReader(enableVectorizedReader: Boolean,
    * Read an individual parquet file
    * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark 
v3.4.2 adapted here
    *
-   * @param file            parquet file to read
-   * @param requiredSchema  desired output schema of the data
-   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
-   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
-   * @param sharedConf      the hadoop conf
+   * @param file               parquet file to read
+   * @param requiredSchema     desired output schema of the data
+   * @param partitionSchema    schema of the partition columns. Partition 
values will be appended to the end of every row
+   * @param internalSchemaOpt  option of internal schema for schema.on.read
+   * @param filters            filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf         the hadoop conf
    * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
    */
   protected def doRead(file: PartitionedFile,
                       requiredSchema: StructType,
                       partitionSchema: StructType,
+                      internalSchemaOpt: 
org.apache.hudi.common.util.Option[InternalSchema],
                       filters: Seq[Filter],
                       sharedConf: Configuration): Iterator[InternalRow] = {
     assert(file.partitionValues.numFields == partitionSchema.size)
@@ -87,7 +92,8 @@ class Spark34ParquetReader(enableVectorizedReader: Boolean,
     val filePath = file.toPath
     val split = new FileSplit(filePath, file.start, file.length, 
Array.empty[String])
 
-    val schemaEvolutionUtils = new 
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema, 
partitionSchema)
+    val schemaEvolutionUtils = new 
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
+      partitionSchema, internalSchemaOpt)
 
     lazy val footerFileMetaData =
       ParquetFooterReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
index 39c2906c0e5..476ade4fb89 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
@@ -19,6 +19,9 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.FileSplit
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
@@ -70,24 +73,27 @@ class Spark35ParquetReader(enableVectorizedReader: Boolean,
    * Read an individual parquet file
    * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark 
v3.5.1 adapted here
    *
-   * @param file            parquet file to read
-   * @param requiredSchema  desired output schema of the data
-   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
-   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
-   * @param sharedConf      the hadoop conf
+   * @param file               parquet file to read
+   * @param requiredSchema     desired output schema of the data
+   * @param partitionSchema    schema of the partition columns. Partition 
values will be appended to the end of every row
+   * @param internalSchemaOpt  option of internal schema for schema.on.read
+   * @param filters            filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf         the hadoop conf
    * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
    */
-  protected def doRead(file: PartitionedFile,
-                       requiredSchema: StructType,
-                       partitionSchema: StructType,
-                       filters: Seq[Filter],
-                       sharedConf: Configuration): Iterator[InternalRow] = {
+  override protected def doRead(file: PartitionedFile,
+                                requiredSchema: StructType,
+                                partitionSchema: StructType,
+                                internalSchemaOpt: 
org.apache.hudi.common.util.Option[InternalSchema],
+                                filters: scala.Seq[Filter],
+                                sharedConf: Configuration): 
Iterator[InternalRow] = {
     assert(file.partitionValues.numFields == partitionSchema.size)
 
     val filePath = file.toPath
     val split = new FileSplit(filePath, file.start, file.length, 
Array.empty[String])
 
-    val schemaEvolutionUtils = new 
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema, 
partitionSchema)
+    val schemaEvolutionUtils = new 
Spark32PlusParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
+      partitionSchema, internalSchemaOpt)
 
     val fileFooter = if (enableVectorizedReader) {
       // When there are vectorized reads, we can avoid reading the footer 
twice by reading

Reply via email to