xiarixiaoyao commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r853677692


##########
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala:
##########
@@ -17,279 +17,301 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.net.URI
-import java.util
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.{InternalSchemaCache, StringUtils}
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
 import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, 
ParquetRecordReader}
-
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.AvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import 
org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat.{createParquetFilters,
 pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, 
PartitionedFile, RecordReaderIterator}
-import org.apache.spark.sql.execution.datasources.parquet._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
 import org.apache.spark.util.SerializableConfiguration
 
-class Spark312HoodieParquetFileFormat extends ParquetFileFormat {
-
-  // reference ParquetFileFormat from spark project
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
-    if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, 
"").isEmpty) {
-      // fallback to origin parquet File read
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
partitionSchema, requiredSchema, filters, options, hadoopConf)
-    } else {
-      hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
-      hadoopConf.set(
-        ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        ParquetWriteSupport.SPARK_ROW_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        SQLConf.SESSION_LOCAL_TIMEZONE.key,
-        sparkSession.sessionState.conf.sessionLocalTimeZone)
-      hadoopConf.setBoolean(
-        SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
-        sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
-      hadoopConf.setBoolean(
-        SQLConf.CASE_SENSITIVE.key,
-        sparkSession.sessionState.conf.caseSensitiveAnalysis)
-
-      ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
-
-      // Sets flags for `ParquetToSparkSchemaConverter`
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_BINARY_AS_STRING.key,
-        sparkSession.sessionState.conf.isParquetBinaryAsString)
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-        sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
-      // for dataSource v1, we have no method to do project for spark physical 
plan.
-      // it's safe to do cols project here.
-      val internalSchemaString = 
hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-      val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-      if (querySchemaOption.isPresent && !requiredSchema.isEmpty) {
-        val prunedSchema = 
SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema,
 querySchemaOption.get())
-        hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, 
SerDeHelper.toJson(prunedSchema))
+import java.net.URI
+
+
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding 
Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.1.2 w/ w/ 
the following changes applied to it:
+ * <ol>
+ *   <li>Avoiding appending partition values to the rows read from the data 
file</li>
+ *   <li>Schema on-read</li>
+ * </ol>
+ */
+class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: 
Boolean) extends ParquetFileFormat {
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter],
+                                              options: Map[String, String],
+                                              hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
+    hadoopConf.set(
+      ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      ParquetWriteSupport.SPARK_ROW_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      SQLConf.SESSION_LOCAL_TIMEZONE.key,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(
+      SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+      sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(
+      SQLConf.CASE_SENSITIVE.key,
+      sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+    ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+    // Sets flags for `ParquetToSparkSchemaConverter`
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      sparkSession.sessionState.conf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+
+    val internalSchemaStr = 
hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+    // For Spark DataSource v1, there's no Physical Plan projection/schema 
pruning w/in Spark itself,
+    // therefore it's safe to do schema projection here
+    if (!isNullOrEmpty(internalSchemaStr)) {
+      val prunedInternalSchemaStr =
+        pruneInternalSchema(internalSchemaStr, requiredSchema)
+      hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, 
prunedInternalSchemaStr)
+    }
+
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
+
+    // TODO: if you move this into the closure it reverts to the default 
values.
+    // If true, enable using the custom RecordReader for parquet. This only 
works for
+    // a subset of the types (no complex types).
+    val resultSchema = StructType(partitionSchema.fields ++ 
requiredSchema.fields)
+    val sqlConf = sparkSession.sessionState.conf
+    val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
+    val enableVectorizedReader: Boolean =
+      sqlConf.parquetVectorizedReaderEnabled &&
+        resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+    val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+    val timestampConversion: Boolean = 
sqlConf.isParquetINT96TimestampConversion
+    val capacity = sqlConf.parquetVectorizedReaderBatchSize
+    val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
+    // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+    val returningBatch = supportBatch(sparkSession, resultSchema)
+    val pushDownDate = sqlConf.parquetFilterPushDownDate
+    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+    val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+    val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+    val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
+    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+
+    (file: PartitionedFile) => {
+      assert(!shouldAppendPartitionValues || file.partitionValues.numFields == 
partitionSchema.size)
+
+      val filePath = new Path(new URI(file.filePath))
+      val split =
+        new org.apache.parquet.hadoop.ParquetInputSplit(
+          filePath,
+          file.start,
+          file.start + file.length,
+          file.length,
+          Array.empty,
+          null)
+
+      val sharedConf = broadcastedHadoopConf.value.value
+
+      // Fetch internal schema
+      val internalSchemaStr = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+      // Internal schema has to be pruned at this point
+      val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
+
+      val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && 
querySchemaOption.isPresent
+
+      val tablePath = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+      val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+      val fileSchema = if (shouldUseInternalSchema) {
+        val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+        InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, 
tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+      } else {
+        null
       }
-      val broadcastedHadoopConf =
-        sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
-
-      // TODO: if you move this into the closure it reverts to the default 
values.
-      // If true, enable using the custom RecordReader for parquet. This only 
works for
-      // a subset of the types (no complex types).
-      val resultSchema = StructType(partitionSchema.fields ++ 
requiredSchema.fields)
-      val sqlConf = sparkSession.sessionState.conf
-      val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
-      val enableVectorizedReader: Boolean =
-        sqlConf.parquetVectorizedReaderEnabled &&
-          resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
-      val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
-      val timestampConversion: Boolean = 
sqlConf.isParquetINT96TimestampConversion
-      val capacity = sqlConf.parquetVectorizedReaderBatchSize
-      val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
-      // Whole stage codegen (PhysicalRDD) is able to deal with batches 
directly
-      val returningBatch = supportBatch(sparkSession, resultSchema)
-      val pushDownDate = sqlConf.parquetFilterPushDownDate
-      val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
-      val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
-      val pushDownStringStartWith = 
sqlConf.parquetFilterPushDownStringStartWith
-      val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
-      val isCaseSensitive = sqlConf.caseSensitiveAnalysis
-
-      (file: PartitionedFile) => {
-        assert(file.partitionValues.numFields == partitionSchema.size)
-        val filePath = new Path(new URI(file.filePath))
-        val split =
-          new org.apache.parquet.hadoop.ParquetInputSplit(
-            filePath,
-            file.start,
-            file.start + file.length,
-            file.length,
-            Array.empty,
-            null)
-        val sharedConf = broadcastedHadoopConf.value.value
-        // do deal with internalSchema
-        val internalSchemaString = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-        // querySchema must be a pruned schema.
-        val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-        val internalSchemaChangeEnabled = if (internalSchemaString.isEmpty || 
!querySchemaOption.isPresent) false else true
-        val tablePath = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
-        val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
-        val fileSchema = if (internalSchemaChangeEnabled) {
-          val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
-          InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, 
tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+
+      lazy val footerFileMetaData =
+        ParquetFileReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
+      val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+        footerFileMetaData.getKeyValueMetaData.get,
+        SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
+      // Try to push down filters when filter push-down is enabled.
+      val pushed = if (enableParquetFilterPushDown) {
+        val parquetSchema = footerFileMetaData.getSchema
+        val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
+          createParquetFilters(
+            parquetSchema,
+            pushDownDate,
+            pushDownTimestamp,
+            pushDownDecimal,
+            pushDownStringStartWith,
+            pushDownInFilterThreshold,
+            isCaseSensitive,
+            datetimeRebaseMode)
         } else {
-          // this should not happened, searchSchemaAndCache will deal with 
correctly.
-          null
+          createParquetFilters(
+            parquetSchema,
+            pushDownDate,
+            pushDownTimestamp,
+            pushDownDecimal,
+            pushDownStringStartWith,
+            pushDownInFilterThreshold,
+            isCaseSensitive)
         }
+        filters.map(rebuildFilterFromParquet(_, fileSchema, 
querySchemaOption.orElse(null)))
+          // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
+          // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
+          // is used here.
+          .flatMap(parquetFilters.createFilter)
+          .reduceOption(FilterApi.and)
+      } else {
+        None
+      }
 
-        lazy val footerFileMetaData =
-          ParquetFileReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
-        val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
-          footerFileMetaData.getKeyValueMetaData.get,
-          SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
-        // Try to push down filters when filter push-down is enabled.
-        val pushed = if (enableParquetFilterPushDown) {
-          val parquetSchema = footerFileMetaData.getSchema
-          val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
-            Spark312HoodieParquetFileFormat.createParquetFilters(
-              parquetSchema,
-              pushDownDate,
-              pushDownTimestamp,
-              pushDownDecimal,
-              pushDownStringStartWith,
-              pushDownInFilterThreshold,
-              isCaseSensitive,
-              datetimeRebaseMode)
-          } else {
-            Spark312HoodieParquetFileFormat.createParquetFilters(
-              parquetSchema,
-              pushDownDate,
-              pushDownTimestamp,
-              pushDownDecimal,
-              pushDownStringStartWith,
-              pushDownInFilterThreshold,
-              isCaseSensitive)
-          }
-          
filters.map(Spark312HoodieParquetFileFormat.rebuildFilterFromParquet(_, 
fileSchema, querySchemaOption.get()))
-            // Collects all converted Parquet filter predicates. Notice that 
not all predicates can be
-            // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
-            // is used here.
-            .flatMap(parquetFilters.createFilter(_))
-            .reduceOption(FilterApi.and)
+      // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions 
to int96 timestamps'
+      // *only* if the file was created by something other than "parquet-mr", 
so check the actual
+      // writer here for this file.  We have to do this per-file, as each file 
in the table may
+      // have different writers.
+      // Define isCreatedByParquetMr as function to avoid unnecessary parquet 
footer reads.
+      def isCreatedByParquetMr: Boolean =
+        footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+      val convertTz =
+        if (timestampConversion && !isCreatedByParquetMr) {
+          
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
         } else {
           None
         }
 
-        // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone 
conversions to int96 timestamps'
-        // *only* if the file was created by something other than 
"parquet-mr", so check the actual
-        // writer here for this file.  We have to do this per-file, as each 
file in the table may
-        // have different writers.
-        // Define isCreatedByParquetMr as function to avoid unnecessary 
parquet footer reads.
-        def isCreatedByParquetMr: Boolean =
-          footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
-
-        val convertTz =
-          if (timestampConversion && !isCreatedByParquetMr) {
-            
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
-          } else {
-            None
-          }
-        val int96RebaseMode = DataSourceUtils.int96RebaseMode(
-          footerFileMetaData.getKeyValueMetaData.get,
-          
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
-
-        val attemptId = new TaskAttemptID(new TaskID(new JobID(), 
TaskType.MAP, 0), 0)
-        // use new conf
-        val hadoopAttempConf = new 
Configuration(broadcastedHadoopConf.value.value)
-        //
-        // reset request schema
-        var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] 
= new java.util.HashMap()
-        if (internalSchemaChangeEnabled) {
-          val mergedInternalSchema = new InternalSchemaMerger(fileSchema, 
querySchemaOption.get(), true, true).mergeSchema()
-          val mergedSchema = 
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
-          typeChangeInfos = 
SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), 
mergedInternalSchema)
-          hadoopAttempConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, 
mergedSchema.json)
-        }
-        val hadoopAttemptContext =
-          new TaskAttemptContextImpl(hadoopAttempConf, attemptId)
+      val int96RebaseMode = DataSourceUtils.int96RebaseMode(
+        footerFileMetaData.getKeyValueMetaData.get,
+        SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
 
-        // Try to push down filters when filter push-down is enabled.
-        // Notice: This push-down is RowGroups level, not individual records.
-        if (pushed.isDefined) {
-          
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
-        }
-        val taskContext = Option(TaskContext.get())
-        if (enableVectorizedReader) {
-          val vectorizedReader = new 
Spark312HoodieVectorizedParquetRecordReader(
-            convertTz.orNull,
-            datetimeRebaseMode.toString,
-            int96RebaseMode.toString,
-            enableOffHeapColumnVector && taskContext.isDefined,
-            capacity, typeChangeInfos)
-          val iter = new RecordReaderIterator(vectorizedReader)
-          // SPARK-23457 Register a task completion listener before 
`initialization`.
-          taskContext.foreach(_.addTaskCompletionListener[Unit](_ => 
iter.close()))
-          vectorizedReader.initialize(split, hadoopAttemptContext)
+      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
+
+      // Clone new conf
+      val hadoopAttemptConf = new 
Configuration(broadcastedHadoopConf.value.value)
+      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = 
new java.util.HashMap()
+      if (shouldUseInternalSchema) {
+        val mergedInternalSchema = new InternalSchemaMerger(fileSchema, 
querySchemaOption.get(), true, true).mergeSchema()
+        val mergedSchema = 
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
+        typeChangeInfos = 
SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), 
mergedInternalSchema)
+        hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, 
mergedSchema.json)
+      }
+      val hadoopAttemptContext =
+        new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
+
+      // Try to push down filters when filter push-down is enabled.
+      // Notice: This push-down is RowGroups level, not individual records.
+      if (pushed.isDefined) {
+        
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
+      }
+      val taskContext = Option(TaskContext.get())
+      if (enableVectorizedReader) {
+        val vectorizedReader = new Spark312HoodieVectorizedParquetRecordReader(
+          convertTz.orNull,

Review Comment:
   Pls use shouldUseInternalSchema to fallback origin 
VectorizedParquetRecordReader.  Spark312HoodieVectorizedParquetRecordReader is 
used for schema evolution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to