This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 23f657db3b9 [HUDI-6635] Add a new Hudi Parquet File Format supporting 
MOR and Bootstrap queries in Spark (#9276)
23f657db3b9 is described below

commit 23f657db3b92e579ed8a32deb2743da5aae34fce
Author: Jon Vexler <[email protected]>
AuthorDate: Sun Aug 6 21:06:12 2023 -0400

    [HUDI-6635] Add a new Hudi Parquet File Format supporting MOR and Bootstrap 
queries in Spark (#9276)
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../spark/sql/HoodieCatalystPlansUtils.scala       |  10 +
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |  10 +-
 ...org.apache.spark.sql.sources.DataSourceRegister |   2 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   9 +
 .../main/scala/org/apache/hudi/DefaultSource.scala |  28 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |   6 +-
 .../org/apache/hudi/HoodieDataSourceHelper.scala   |   2 +-
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |  55 +++-
 .../org/apache/hudi/IncrementalRelation.scala      |   4 +-
 .../src/main/scala/org/apache/hudi/Iterators.scala |  47 ++-
 .../hudi/NewHoodieParquetFileFormatUtils.scala     | 212 ++++++++++++
 .../apache/hudi/PartitionFileSliceMapping.scala    |  77 +++++
 .../apache/hudi/SparkHoodieTableFileIndex.scala    |   4 +-
 ...t.scala => LegacyHoodieParquetFileFormat.scala} |  13 +-
 .../parquet/NewHoodieParquetFileFormat.scala       | 362 +++++++++++++++++++++
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |   3 +
 .../apache/hudi/functional/TestBootstrapRead.java  | 250 +-------------
 ...otstrapRead.java => TestBootstrapReadBase.java} |  87 ++---
 .../functional/TestNewHoodieParquetFileFormat.java | 138 ++++++++
 .../spark/sql/HoodieSpark2CatalystPlanUtils.scala  |  16 +-
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |  13 +-
 ... => Spark24LegacyHoodieParquetFileFormat.scala} |   2 +-
 .../spark/sql/adapter/BaseSpark3Adapter.scala      |   5 +
 .../spark/sql/HoodieSpark30CatalystPlanUtils.scala |  16 +-
 .../apache/spark/sql/adapter/Spark3_0Adapter.scala |   6 +-
 ... => Spark30LegacyHoodieParquetFileFormat.scala} |   6 +-
 .../spark/sql/HoodieSpark31CatalystPlanUtils.scala |  16 +-
 .../apache/spark/sql/adapter/Spark3_1Adapter.scala |   6 +-
 ... => Spark31LegacyHoodieParquetFileFormat.scala} |   6 +-
 .../spark/sql/HoodieSpark32CatalystPlanUtils.scala |  16 +-
 .../apache/spark/sql/adapter/Spark3_2Adapter.scala |   8 +-
 ... => Spark32LegacyHoodieParquetFileFormat.scala} |  11 +-
 .../spark/sql/HoodieSpark33CatalystPlanUtils.scala |  16 +-
 .../apache/spark/sql/adapter/Spark3_3Adapter.scala |   8 +-
 ... => Spark33LegacyHoodieParquetFileFormat.scala} |   6 +-
 .../spark/sql/HoodieSpark34CatalystPlanUtils.scala |  16 +-
 .../apache/spark/sql/adapter/Spark3_4Adapter.scala |  10 +-
 ... => Spark34LegacyHoodieParquetFileFormat.scala} |   6 +-
 rfc/README.md                                      |   2 +-
 39 files changed, 1104 insertions(+), 406 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
index 88ac1693318..58789681c54 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.internal.SQLConf
 
 trait HoodieCatalystPlansUtils {
@@ -77,6 +78,15 @@ trait HoodieCatalystPlansUtils {
    */
   def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan, 
LogicalPlan, Expression)]
 
+
+  /**
+   * Spark requires file formats to append the partition path fields to the 
end of the schema.
+   * For tables where the partition path fields are not at the end of the 
schema, we don't want
+   * to return the schema in the wrong order when they do a query like "select 
*". To fix this
+   * behavior, we apply a projection onto FileScan when the file format is 
NewHudiParquetFileFormat
+   */
+  def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan
+
   /**
    * Decomposes [[InsertIntoStatement]] into its arguments allowing to 
accommodate for API
    * changes in Spark 3.3
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 782a49ac189..041beba95df 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -26,11 +26,10 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, 
HoodieAvroSchemaConverters, HoodieAvroSerializer}
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, InterpretedPredicate}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.JoinType
-import org.apache.spark.sql.catalyst.plans.logical.{Command, Join, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.DateFormatter
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.datasources._
@@ -38,6 +37,7 @@ import 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.parser.HoodieExtendedParserInterface
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.{DataType, Metadata, StructType}
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
 import org.apache.spark.storage.StorageLevel
 
 import java.util.{Locale, TimeZone}
@@ -165,7 +165,9 @@ trait SparkAdapter extends Serializable {
   /**
    * Create instance of [[ParquetFileFormat]]
    */
-  def createHoodieParquetFileFormat(appendPartitionValues: Boolean): 
Option[ParquetFileFormat]
+  def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean): 
Option[ParquetFileFormat]
+
+  def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int): 
ColumnarBatch
 
   /**
    * Create instance of [[InterpretedPredicate]]
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 556b0feef1c..f4029445f61 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -17,4 +17,4 @@
 
 
 org.apache.hudi.DefaultSource
-org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
\ No newline at end of file
+org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 59c2a60a3ad..6e14e262d2c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -87,6 +87,15 @@ object DataSourceReadOptions {
       s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL}) 
or skip merging altogether" +
       s"${REALTIME_SKIP_MERGE_OPT_VAL}")
 
+  val USE_NEW_HUDI_PARQUET_FILE_FORMAT: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.datasource.read.use.new.parquet.file.format")
+    .defaultValue("false")
+    .markAdvanced()
+    .sinceVersion("0.14.0")
+    .withDocumentation("Read using the new Hudi parquet file format. The new 
Hudi parquet file format is " +
+      "introduced as an experimental feature in 0.14.0. Currently, the new 
Hudi parquet file format only applies " +
+      "to bootstrap and MOR queries. Schema evolution is also not supported by 
the new file format.")
+
   val READ_PATHS: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.read.paths")
     .noDefaultValue()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 3e5cf351ba1..5ecf250eaab 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -246,6 +246,16 @@ object DefaultSource {
     } else if (isCdcQuery) {
       CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
     } else {
+      lazy val newHudiFileFormatUtils = if 
(parameters.getOrElse(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key,
+        USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean && (globPaths 
== null || globPaths.isEmpty)
+        && parameters.getOrElse(REALTIME_MERGE.key(), 
REALTIME_MERGE.defaultValue())
+        .equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL)) {
+        val formatUtils = new NewHoodieParquetFileFormatUtils(sqlContext, 
metaClient, parameters, userSchema)
+        if (formatUtils.hasSchemaOnRead) Option.empty else Some(formatUtils)
+      } else {
+        Option.empty
+      }
+
       (tableType, queryType, isBootstrappedTable) match {
         case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
              (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
@@ -256,16 +266,28 @@ object DefaultSource {
           new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
 
         case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
-          new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, 
globPaths, userSchema)
+          if (newHudiFileFormatUtils.isEmpty) {
+            new MergeOnReadSnapshotRelation(sqlContext, parameters, 
metaClient, globPaths, userSchema)
+          } else {
+            newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true, 
isBootstrap = false)
+          }
 
         case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
           new MergeOnReadIncrementalRelation(sqlContext, parameters, 
metaClient, userSchema)
 
         case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
-          new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, 
metaClient, parameters)
+          if (newHudiFileFormatUtils.isEmpty) {
+            new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, 
metaClient, parameters)
+          } else {
+            newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true, 
isBootstrap = true)
+          }
 
         case (_, _, true) =>
-          resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
+          if (newHudiFileFormatUtils.isEmpty) {
+            resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
+          } else {
+            newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = false, 
isBootstrap = true)
+          }
 
         case (_, _, _) =>
           throw new HoodieException(s"Invalid query type : $queryType for 
tableType: $tableType," +
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 2f9579d629e..fea7781f84d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -56,7 +56,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, 
SubqueryExpression
 import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
-import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, 
ParquetFileFormat}
+import 
org.apache.spark.sql.execution.datasources.parquet.{LegacyHoodieParquetFileFormat,
 ParquetFileFormat}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
 import org.apache.spark.sql.types.StructType
@@ -241,8 +241,8 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
       case HoodieFileFormat.PARQUET =>
         // We're delegating to Spark to append partition values to every row 
only in cases
         // when these corresponding partition-values are not persisted w/in 
the data file itself
-        val parquetFileFormat = 
sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
-        (parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID)
+        val parquetFileFormat = 
sparkAdapter.createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
+        (parquetFileFormat, LegacyHoodieParquetFileFormat.FILE_FORMAT_ID)
     }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
index 49db12b792c..eb8ddfdf870 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
@@ -52,7 +52,7 @@ object HoodieDataSourceHelper extends PredicateHelper with 
SparkAdapterSupport {
                                              options: Map[String, String],
                                              hadoopConf: Configuration,
                                              appendPartitionValues: Boolean = 
false): PartitionedFile => Iterator[InternalRow] = {
-    val parquetFileFormat: ParquetFileFormat = 
sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
+    val parquetFileFormat: ParquetFileFormat = 
sparkAdapter.createLegacyHoodieParquetFileFormat(appendPartitionValues).get
     val readParquetFile: PartitionedFile => Iterator[Any] = 
parquetFileFormat.buildReaderWithPartitionValues(
       sparkSession = sparkSession,
       dataSchema = dataSchema,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 9791d39e280..964ef970c91 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -18,7 +18,7 @@
 package org.apache.hudi
 
 import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, 
collectReferencedColumns, getConfigProperties}
+import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, 
collectReferencedColumns, convertFilterForTimestampKeyGenerator, 
getConfigProperties}
 import org.apache.hudi.HoodieSparkConfUtils.getConfigValue
 import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
 TIMESTAMP_OUTPUT_DATE_FORMAT}
 import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
@@ -100,6 +100,8 @@ case class HoodieFileIndex(spark: SparkSession,
 
   override def rootPaths: Seq[Path] = getQueryPaths.asScala
 
+  var shouldBroadcast: Boolean = false
+
   /**
    * Returns the FileStatus for all the base files (excluding log files). This 
should be used only for
    * cases where Spark directly fetches the list of files via HoodieFileIndex 
or for read optimized query logic
@@ -142,26 +144,49 @@ case class HoodieFileIndex(spark: SparkSession,
   override def listFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[PartitionDirectory] = {
     val prunedPartitionsAndFilteredFileSlices = filterFileSlices(dataFilters, 
partitionFilters).map {
       case (partitionOpt, fileSlices) =>
-        val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
-          val baseFileStatusOpt = 
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
-          val logFilesStatus = if (includeLogFiles) {
-            
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile, 
FileStatus](lf => lf.getFileStatus))
+        if (shouldBroadcast) {
+          val baseFileStatusesAndLogFileOnly: Seq[FileStatus] = 
fileSlices.map(slice => {
+            if (slice.getBaseFile.isPresent) {
+              slice.getBaseFile.get().getFileStatus
+            } else if (slice.getLogFiles.findAny().isPresent) {
+              slice.getLogFiles.findAny().get().getFileStatus
+            } else {
+              null
+            }
+          }).filter(slice => slice != null)
+          val c = fileSlices.filter(f => f.getLogFiles.findAny().isPresent
+            || (f.getBaseFile.isPresent && 
f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
+            foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> 
f) }
+          if (c.nonEmpty) {
+            PartitionDirectory(new 
PartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values), 
spark.sparkContext.broadcast(c)), baseFileStatusesAndLogFileOnly)
           } else {
-            java.util.stream.Stream.empty()
+            PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values), 
baseFileStatusesAndLogFileOnly)
           }
-          val files = 
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
-          baseFileStatusOpt.foreach(f => files.append(f))
-          files
-        })
 
-        PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values), 
allCandidateFiles)
+        } else {
+          val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
+            val baseFileStatusOpt = 
getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
+            val logFilesStatus = if (includeLogFiles) {
+              
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile, 
FileStatus](lf => lf.getFileStatus))
+            } else {
+              java.util.stream.Stream.empty()
+            }
+            val files = 
logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
+            baseFileStatusOpt.foreach(f => files.append(f))
+            files
+          })
+          PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values), 
allCandidateFiles)
+        }
     }
 
     hasPushedDownPartitionPredicates = true
 
     if (shouldReadAsPartitionedTable()) {
       prunedPartitionsAndFilteredFileSlices
-    } else {
+    } else if (shouldBroadcast) {
+      assert(partitionSchema.isEmpty)
+      prunedPartitionsAndFilteredFileSlices
+    }else {
       Seq(PartitionDirectory(InternalRow.empty, 
prunedPartitionsAndFilteredFileSlices.flatMap(_.files)))
     }
   }
@@ -244,7 +269,11 @@ case class HoodieFileIndex(spark: SparkSession,
     // Prune the partition path by the partition filters
     // NOTE: Non-partitioned tables are assumed to consist from a single 
partition
     //       encompassing the whole table
-    val prunedPartitions = listMatchingPartitionPaths (partitionFilters)
+    val prunedPartitions = if (shouldBroadcast) {
+      
listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient, 
partitionFilters))
+    } else {
+      listMatchingPartitionPaths(partitionFilters)
+    }
     getInputFileSlices(prunedPartitions: _*).asScala.toSeq.map(
       { case (partition, fileSlices) => (Option.apply(partition), 
fileSlices.asScala) })
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 6daa84e1b42..79dad997b4c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -37,7 +37,7 @@ import org.apache.hudi.internal.schema.utils.SerDeHelper
 import org.apache.hudi.table.HoodieSparkTable
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.rdd.RDD
-import 
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
+import 
org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat
 import org.apache.spark.sql.sources.{BaseRelation, TableScan}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SQLContext}
@@ -206,7 +206,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
       
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH,
 metaClient.getBasePath)
       
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST,
 validCommits)
       val formatClassName = metaClient.getTableConfig.getBaseFileFormat match {
-        case HoodieFileFormat.PARQUET => HoodieParquetFileFormat.FILE_FORMAT_ID
+        case HoodieFileFormat.PARQUET => 
LegacyHoodieParquetFileFormat.FILE_FORMAT_ID
         case HoodieFileFormat.ORC => "orc"
       }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index 942ade81cc5..054fcc799d7 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -30,7 +30,7 @@ import org.apache.hudi.LogFileIterator._
 import org.apache.hudi.common.config.{HoodieCommonConfig, 
HoodieMetadataConfig, TypedProperties}
 import org.apache.hudi.common.engine.{EngineType, HoodieLocalEngineContext}
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
+import org.apache.hudi.common.fs.FSUtils.{buildInlineConf, 
getRelativePartitionPath}
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
@@ -61,15 +61,26 @@ import scala.util.Try
 class LogFileIterator(logFiles: List[HoodieLogFile],
                       partitionPath: Path,
                       tableSchema: HoodieTableSchema,
-                      requiredSchema: HoodieTableSchema,
+                      requiredStructTypeSchema: StructType,
+                      requiredAvroSchema: Schema,
                       tableState: HoodieTableState,
                       config: Configuration)
   extends CachingIterator[InternalRow] with AvroDeserializerSupport {
 
+  def this(logFiles: List[HoodieLogFile],
+            partitionPath: Path,
+            tableSchema: HoodieTableSchema,
+            requiredSchema: HoodieTableSchema,
+            tableState: HoodieTableState,
+            config: Configuration) {
+    this(logFiles, partitionPath, tableSchema, requiredSchema.structTypeSchema,
+      new Schema.Parser().parse(requiredSchema.avroSchemaStr), tableState, 
config)
+  }
   def this(split: HoodieMergeOnReadFileSplit,
            tableSchema: HoodieTableSchema,
            requiredSchema: HoodieTableSchema,
-           tableState: HoodieTableState, config: Configuration) {
+           tableState: HoodieTableState,
+           config: Configuration) {
     this(split.logFiles, getPartitionPath(split), tableSchema, requiredSchema, 
tableState, config)
   }
   private val maxCompactionMemoryInBytes: Long = 
getMaxCompactionMemoryInBytes(new JobConf(config))
@@ -83,8 +94,8 @@ class LogFileIterator(logFiles: List[HoodieLogFile],
     }
     .getOrElse(new TypedProperties())
 
-  protected override val avroSchema: Schema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-  protected override val structTypeSchema: StructType = 
requiredSchema.structTypeSchema
+  protected override val avroSchema: Schema = requiredAvroSchema
+  protected override val structTypeSchema: StructType = 
requiredStructTypeSchema
 
   protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
   protected val logFileReaderStructType: StructType = 
tableSchema.structTypeSchema
@@ -142,20 +153,22 @@ class LogFileIterator(logFiles: List[HoodieLogFile],
  * Base file as well as all of the Delta Log files simply returning 
concatenation of these streams, while not
  * performing any combination/merging of the records w/ the same primary keys 
(ie producing duplicates potentially)
  */
-private class SkipMergeIterator(logFiles: List[HoodieLogFile],
+class SkipMergeIterator(logFiles: List[HoodieLogFile],
                                 partitionPath: Path,
                                 baseFileIterator: Iterator[InternalRow],
                                 readerSchema: StructType,
                                 dataSchema: HoodieTableSchema,
-                                requiredSchema: HoodieTableSchema,
+                                requiredStructTypeSchema: StructType,
+                                requiredAvroSchema: Schema,
                                 tableState: HoodieTableState,
                                 config: Configuration)
-  extends LogFileIterator(logFiles, partitionPath, dataSchema, requiredSchema, 
tableState, config) {
+  extends LogFileIterator(logFiles, partitionPath, dataSchema, 
requiredStructTypeSchema, requiredAvroSchema, tableState, config) {
 
   def this(split: HoodieMergeOnReadFileSplit, baseFileReader: BaseFileReader, 
dataSchema: HoodieTableSchema,
            requiredSchema: HoodieTableSchema, tableState: HoodieTableState, 
config: Configuration) {
     this(split.logFiles, getPartitionPath(split), 
baseFileReader(split.dataFile.get),
-      baseFileReader.schema, dataSchema, requiredSchema, tableState, config)
+      baseFileReader.schema, dataSchema, requiredSchema.structTypeSchema,
+      new Schema.Parser().parse(requiredSchema.avroSchemaStr), tableState, 
config)
   }
 
   private val requiredSchemaProjection = 
generateUnsafeProjection(readerSchema, structTypeSchema)
@@ -181,11 +194,23 @@ class RecordMergingFileIterator(logFiles: 
List[HoodieLogFile],
                                 baseFileIterator: Iterator[InternalRow],
                                 readerSchema: StructType,
                                 dataSchema: HoodieTableSchema,
-                                requiredSchema: HoodieTableSchema,
+                                requiredStructTypeSchema: StructType,
+                                requiredAvroSchema: Schema,
                                 tableState: HoodieTableState,
                                 config: Configuration)
-  extends LogFileIterator(logFiles, partitionPath, dataSchema, requiredSchema, 
tableState, config) {
+  extends LogFileIterator(logFiles, partitionPath, dataSchema, 
requiredStructTypeSchema, requiredAvroSchema, tableState, config) {
 
+  def this(logFiles: List[HoodieLogFile],
+           partitionPath: Path,
+           baseFileIterator: Iterator[InternalRow],
+           readerSchema: StructType,
+           dataSchema: HoodieTableSchema,
+           requiredSchema: HoodieTableSchema,
+           tableState: HoodieTableState,
+           config: Configuration) {
+    this(logFiles, partitionPath, baseFileIterator, readerSchema, dataSchema, 
requiredSchema.structTypeSchema,
+      new Schema.Parser().parse(requiredSchema.avroSchemaStr), tableState, 
config)
+  }
   def this(split: HoodieMergeOnReadFileSplit, baseFileReader: BaseFileReader, 
dataSchema: HoodieTableSchema,
            requiredSchema: HoodieTableSchema, tableState: HoodieTableState, 
config: Configuration) {
     this(split.logFiles, getPartitionPath(split), 
baseFileReader(split.dataFile.get),
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
new file mode 100644
index 00000000000..5dd85c973b6
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.HoodieBaseRelation._
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.common.config.ConfigProperty
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieRecord
+import 
org.apache.hudi.common.model.HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
+import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
+import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import 
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, 
HadoopFsRelation}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{SQLContext, SparkSession}
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+class NewHoodieParquetFileFormatUtils(val sqlContext: SQLContext,
+                                      val metaClient: HoodieTableMetaClient,
+                                      val optParamsInput: Map[String, String],
+                                      private val schemaSpec: 
Option[StructType]) extends SparkAdapterSupport {
+  protected val sparkSession: SparkSession = sqlContext.sparkSession
+
+  protected val optParams: Map[String, String] = optParamsInput.filter(kv => 
!kv._1.equals(DATA_QUERIES_ONLY.key()))
+  protected def tableName: String = metaClient.getTableConfig.getTableName
+
+  protected lazy val resolver: Resolver = 
sparkSession.sessionState.analyzer.resolver
+
+  private lazy val metaFieldNames = 
HoodieRecord.HOODIE_META_COLUMNS.asScala.toSet
+
+  protected lazy val conf: Configuration = new 
Configuration(sqlContext.sparkContext.hadoopConfiguration)
+  protected lazy val jobConf = new JobConf(conf)
+
+  protected lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig
+
+  protected lazy val basePath: Path = metaClient.getBasePathV2
+
+  protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: 
Option[InternalSchema]) = {
+    val schemaResolver = new TableSchemaResolver(metaClient)
+    val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(optParams, 
sparkSession)) {
+      None
+    } else {
+      Try {
+        
specifiedQueryTimestamp.map(schemaResolver.getTableInternalSchemaFromCommitMetadata)
+          .getOrElse(schemaResolver.getTableInternalSchemaFromCommitMetadata)
+      } match {
+        case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt)
+        case Failure(e) =>
+          None
+      }
+    }
+
+    val (name, namespace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
+    val avroSchema = internalSchemaOpt.map { is =>
+      AvroInternalSchemaConverter.convert(is, namespace + "." + name)
+    } orElse {
+      specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
+    } orElse {
+      schemaSpec.map(s => convertToAvroSchema(s, tableName))
+    } getOrElse {
+      Try(schemaResolver.getTableAvroSchema) match {
+        case Success(schema) => schema
+        case Failure(e) =>
+          throw new HoodieSchemaException("Failed to fetch schema from the 
table")
+      }
+    }
+
+    (avroSchema, internalSchemaOpt)
+  }
+
+  protected lazy val tableStructSchema: StructType = {
+    val converted = 
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+    val metaFieldMetadata = sparkAdapter.createCatalystMetadataForMetaField
+
+    // NOTE: Here we annotate meta-fields with corresponding metadata such 
that Spark (>= 3.2)
+    //       is able to recognize such fields as meta-fields
+    StructType(converted.map { field =>
+      if (metaFieldNames.exists(metaFieldName => resolver(metaFieldName, 
field.name))) {
+        field.copy(metadata = metaFieldMetadata)
+      } else {
+        field
+      }
+    })
+  }
+
+  protected lazy val preCombineFieldOpt: Option[String] =
+    Option(tableConfig.getPreCombineField)
+      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) 
match {
+      // NOTE: This is required to compensate for cases when empty string is 
used to stub
+      //       property value to avoid it being set with the default value
+      // TODO(HUDI-3456) cleanup
+      case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
+      case _ => None
+    }
+
+  protected def timeline: HoodieTimeline =
+  // NOTE: We're including compaction here since it's not considering a 
"commit" operation
+    metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants
+
+  protected lazy val recordKeyField: String =
+    if (tableConfig.populateMetaFields()) {
+      HoodieRecord.RECORD_KEY_METADATA_FIELD
+    } else {
+      val keyFields = tableConfig.getRecordKeyFields.get()
+      checkState(keyFields.length == 1)
+      keyFields.head
+    }
+
+  private def queryTimestamp: Option[String] =
+    
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp))
+
+  protected lazy val specifiedQueryTimestamp: Option[String] =
+    optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
+      .map(HoodieSqlCommonUtils.formatQueryInstant)
+
+  private def getConfigValue(config: ConfigProperty[String],
+                             defaultValueOption: Option[String] = 
Option.empty): String = {
+    optParams.getOrElse(config.key(),
+      sqlContext.getConf(config.key(), 
defaultValueOption.getOrElse(config.defaultValue())))
+  }
+
+  protected val mergeType: String = 
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
+    DataSourceReadOptions.REALTIME_MERGE.defaultValue)
+
+  protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
+    // Controls whether partition columns (which are the source for the 
partition path values) should
+    // be omitted from persistence in the data files. On the read path it 
affects whether partition values (values
+    // of partition columns) will be read from the data file or extracted from 
partition path
+    val shouldOmitPartitionColumns = 
metaClient.getTableConfig.shouldDropPartitionColumns && 
partitionColumns.nonEmpty
+    val shouldExtractPartitionValueFromPath =
+      
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
+        
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
+    val shouldUseBootstrapFastRead = 
optParams.getOrElse(DATA_QUERIES_ONLY.key(), "false").toBoolean
+
+    shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath || 
shouldUseBootstrapFastRead
+  }
+
+  protected lazy val mandatoryFieldsForMerging: Seq[String] =
+    Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
+
+  protected lazy val partitionColumns: Array[String] = 
tableConfig.getPartitionFields.orElse(Array.empty)
+
+  def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined
+
+  def getHadoopFsRelation(isMOR: Boolean, isBootstrap: Boolean): BaseRelation 
= {
+
+    val fileIndex = HoodieFileIndex(sparkSession, metaClient, 
Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sparkSession), 
isMOR)
+    val recordMergerImpls = 
ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList
+    val recordMergerStrategy = 
getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY,
+      Option(metaClient.getTableConfig.getRecordMergerStrategy))
+
+    val tableState = // Subset of the state of table's configuration as of at 
the time of the query
+      HoodieTableState(
+        tablePath = basePath.toString,
+        latestCommitTimestamp = queryTimestamp,
+        recordKeyField = recordKeyField,
+        preCombineFieldOpt = preCombineFieldOpt,
+        usesVirtualKeys = !tableConfig.populateMetaFields(),
+        recordPayloadClassName = tableConfig.getPayloadClass,
+        metadataConfig = fileIndex.metadataConfig,
+        recordMergerImpls = recordMergerImpls,
+        recordMergerStrategy = recordMergerStrategy
+      )
+
+    val mandatoryFields = if (isMOR) {
+      mandatoryFieldsForMerging
+    } else {
+      Seq.empty
+    }
+    fileIndex.shouldBroadcast = true
+    HadoopFsRelation(
+      location = fileIndex,
+      partitionSchema = fileIndex.partitionSchema,
+      dataSchema = fileIndex.dataSchema,
+      bucketSpec = None,
+      fileFormat = new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
isMOR, isBootstrap),
+      optParams)(sparkSession)
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
new file mode 100644
index 00000000000..c9468e2d601
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionFileSliceMapping.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.model.FileSlice
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+class PartitionFileSliceMapping(internalRow: InternalRow,
+                                broadcast: Broadcast[Map[String, FileSlice]]) 
extends InternalRow {
+
+  def getSlice(fileId: String): Option[FileSlice] = {
+    broadcast.value.get(fileId)
+  }
+
+  def getInternalRow: InternalRow = internalRow
+
+  override def numFields: Int = internalRow.numFields
+
+  override def setNullAt(i: Int): Unit = internalRow.setNullAt(i)
+
+  override def update(i: Int, value: Any): Unit = internalRow.update(i, value)
+
+  override def copy(): InternalRow = new 
PartitionFileSliceMapping(internalRow.copy(), broadcast)
+
+  override def isNullAt(ordinal: Int): Boolean = internalRow.isNullAt(ordinal)
+
+  override def getBoolean(ordinal: Int): Boolean = 
internalRow.getBoolean(ordinal)
+
+  override def getByte(ordinal: Int): Byte = internalRow.getByte(ordinal)
+
+  override def getShort(ordinal: Int): Short = internalRow.getShort(ordinal)
+
+  override def getInt(ordinal: Int): Int = internalRow.getInt(ordinal)
+
+  override def getLong(ordinal: Int): Long = internalRow.getLong(ordinal)
+
+  override def getFloat(ordinal: Int): Float = internalRow.getFloat(ordinal)
+
+  override def getDouble(ordinal: Int): Double = internalRow.getDouble(ordinal)
+
+  override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal = 
internalRow.getDecimal(ordinal, precision, scale)
+
+  override def getUTF8String(ordinal: Int): UTF8String = 
internalRow.getUTF8String(ordinal)
+
+  override def getBinary(ordinal: Int): Array[Byte] = 
internalRow.getBinary(ordinal)
+
+  override def getInterval(ordinal: Int): CalendarInterval = 
internalRow.getInterval(ordinal)
+
+  override def getStruct(ordinal: Int, numFields: Int): InternalRow = 
internalRow.getStruct(ordinal, numFields)
+
+  override def getArray(ordinal: Int): ArrayData = 
internalRow.getArray(ordinal)
+
+  override def getMap(ordinal: Int): MapData = internalRow.getMap(ordinal)
+
+  override def get(ordinal: Int, dataType: DataType): AnyRef = 
internalRow.get(ordinal, dataType)
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index b3d9e5659e8..d1b6df6619d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -47,7 +47,7 @@ import java.util.Collections
 import javax.annotation.concurrent.NotThreadSafe
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
-import scala.util.{Failure, Success, Try}
+import scala.util.{Success, Try}
 
 /**
  * Implementation of the [[BaseHoodieTableFileIndex]] for Spark
@@ -96,7 +96,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
       
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
     })
 
-  protected lazy val shouldFastBootstrap: Boolean = 
configProperties.getBoolean(DATA_QUERIES_ONLY.key, false)
+  protected lazy val shouldFastBootstrap = 
configProperties.getBoolean(DATA_QUERIES_ONLY.key, false)
 
   private lazy val sparkParsePartitionUtil = 
sparkAdapter.getSparkParsePartitionUtil
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
similarity index 85%
rename from 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
rename to 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
index 7f494af37af..046640c11c1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/LegacyHoodieParquetFileFormat.scala
@@ -23,12 +23,15 @@ import org.apache.hudi.{DataSourceReadOptions, 
HoodieSparkUtils, SparkAdapterSup
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.PartitionedFile
-import 
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat.FILE_FORMAT_ID
+import 
org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat.FILE_FORMAT_ID
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.{AtomicType, StructType}
 
-
-class HoodieParquetFileFormat extends ParquetFileFormat with 
SparkAdapterSupport {
+/**
+ * This legacy parquet file format implementation to support Hudi will be 
replaced by
+ * [[NewHoodieParquetFileFormat]] in the future.
+ */
+class LegacyHoodieParquetFileFormat extends ParquetFileFormat with 
SparkAdapterSupport {
 
   override def shortName(): String = FILE_FORMAT_ID
 
@@ -55,11 +58,11 @@ class HoodieParquetFileFormat extends ParquetFileFormat 
with SparkAdapterSupport
         
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
 
     sparkAdapter
-      
.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
+      
.createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
       .buildReaderWithPartitionValues(sparkSession, dataSchema, 
partitionSchema, requiredSchema, filters, options, hadoopConf)
   }
 }
 
-object HoodieParquetFileFormat {
+object LegacyHoodieParquetFileFormat {
   val FILE_FORMAT_ID = "hoodie-parquet"
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
new file mode 100644
index 00000000000..0c1c3c8e5ee
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import 
org.apache.hudi.DataSourceReadOptions.{REALTIME_PAYLOAD_COMBINE_OPT_VAL, 
REALTIME_SKIP_MERGE_OPT_VAL}
+import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{BaseFile, FileSlice, HoodieLogFile, 
HoodieRecord}
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils, 
HoodieTableSchema, HoodieTableState, LogFileIterator, 
MergeOnReadSnapshotRelation, PartitionFileSliceMapping, 
RecordMergingFileIterator, SkipMergeIterator, SparkAdapterSupport}
+import org.apache.spark.broadcast.Broadcast
+import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
+import org.apache.spark.util.SerializableConfiguration
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.asScalaIteratorConverter
+
+/**
+ * This class does bootstrap and MOR merging so that we can use hadoopfs 
relation.
+ */
+class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState],
+                                 tableSchema: Broadcast[HoodieTableSchema],
+                                 tableName: String,
+                                 mergeType: String,
+                                 mandatoryFields: Seq[String],
+                                 isMOR: Boolean,
+                                 isBootstrap: Boolean) extends 
ParquetFileFormat with SparkAdapterSupport {
+
+  override def isSplitable(sparkSession: SparkSession,
+                           options: Map[String, String],
+                           path: Path): Boolean = {
+    false
+  }
+
+  //Used so that the planner only projects once and does not stack overflow
+  var isProjected = false
+
+  /**
+   * Support batch needs to remain consistent, even if one side of a bootstrap 
merge can support
+   * while the other side can't
+   */
+  private var supportBatchCalled = false
+  private var supportBatchResult = false
+  override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
+    if (!supportBatchCalled) {
+      supportBatchCalled = true
+      supportBatchResult = !isMOR && super.supportBatch(sparkSession, schema)
+    }
+    supportBatchResult
+  }
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter],
+                                              options: Map[String, String],
+                                              hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
+
+    val outputSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
+
+    val requiredSchemaWithMandatory = if (!isMOR || 
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
+      //add mandatory fields to required schema
+      val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
+      for (field <- mandatoryFields) {
+        if (requiredSchema.getFieldIndex(field).isEmpty) {
+          val fieldToAdd = 
dataSchema.fields(dataSchema.getFieldIndex(field).get)
+          added.append(fieldToAdd)
+        }
+      }
+      val addedFields = StructType(added.toArray)
+      StructType(requiredSchema.toArray ++ addedFields.fields)
+    } else {
+      dataSchema
+    }
+
+    val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f 
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
+    val requiredMeta = StructType(requiredSchemaSplits._1)
+    val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
+    val needMetaCols = requiredMeta.nonEmpty
+    val needDataCols = requiredWithoutMeta.nonEmpty
+    // note: this is only the output of the bootstrap merge if isMOR. If it is 
only bootstrap then the
+    // output will just be outputSchema
+    val bootstrapReaderOutput = StructType(requiredMeta.fields ++ 
requiredWithoutMeta.fields)
+
+    val skeletonReaderAppend = needMetaCols && isBootstrap && !(needDataCols 
|| isMOR) && partitionSchema.nonEmpty
+    val bootstrapBaseAppend = needDataCols && isBootstrap && !isMOR && 
partitionSchema.nonEmpty
+
+    val (baseFileReader, preMergeBaseFileReader, skeletonReader, 
bootstrapBaseReader) = buildFileReaders(sparkSession,
+      dataSchema, partitionSchema, requiredSchema, filters, options, 
hadoopConf, requiredSchemaWithMandatory,
+      requiredWithoutMeta, requiredMeta)
+
+    val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
+    (file: PartitionedFile) => {
+      file.partitionValues match {
+        case broadcast: PartitionFileSliceMapping =>
+          val filePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+          if (FSUtils.isLogFile(filePath)) {
+            //no base file
+            val fileSlice = 
broadcast.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
+            val logFiles = getLogFilesFromSlice(fileSlice)
+            val outputAvroSchema = 
HoodieBaseRelation.convertToAvroSchema(outputSchema, tableName)
+            new LogFileIterator(logFiles, filePath.getParent, 
tableSchema.value, outputSchema, outputAvroSchema,
+              tableState.value, broadcastedHadoopConf.value.value)
+          } else {
+            //We do not broadcast the slice if it has no log files or 
bootstrap base
+            broadcast.getSlice(FSUtils.getFileId(filePath.getName)) match {
+              case Some(fileSlice) =>
+                val hoodieBaseFile = fileSlice.getBaseFile.get()
+                val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
+                val partitionValues = broadcast.getInternalRow
+                val logFiles = getLogFilesFromSlice(fileSlice)
+                if (requiredSchemaWithMandatory.isEmpty) {
+                  val baseFile = createPartitionedFile(partitionValues, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
+                  baseFileReader(baseFile)
+                } else if (bootstrapFileOpt.isPresent) {
+                  val bootstrapIterator = 
buildBootstrapIterator(skeletonReader, bootstrapBaseReader,
+                    skeletonReaderAppend, bootstrapBaseAppend, 
bootstrapFileOpt.get(), hoodieBaseFile, partitionValues,
+                    needMetaCols, needDataCols)
+                  (isMOR, logFiles.nonEmpty) match {
+                    case (true, true) => 
buildMergeOnReadIterator(bootstrapIterator, logFiles, filePath.getParent,
+                      bootstrapReaderOutput, requiredSchemaWithMandatory, 
outputSchema, partitionSchema, partitionValues,
+                      broadcastedHadoopConf.value.value)
+                    case (true, false) => 
appendPartitionAndProject(bootstrapIterator, bootstrapReaderOutput,
+                      partitionSchema, outputSchema, partitionValues)
+                    case (false, false) => bootstrapIterator
+                    case (false, true) => throw new 
IllegalStateException("should not be log files if not mor table")
+                  }
+                } else {
+                  if (logFiles.nonEmpty) {
+                    val baseFile = createPartitionedFile(InternalRow.empty, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
+                    buildMergeOnReadIterator(preMergeBaseFileReader(baseFile), 
logFiles, filePath.getParent, requiredSchemaWithMandatory,
+                      requiredSchemaWithMandatory, outputSchema, 
partitionSchema, partitionValues, broadcastedHadoopConf.value.value)
+                  } else {
+                    throw new IllegalStateException("should not be here since 
file slice should not have been broadcasted since it has no log or data files")
+                    //baseFileReader(baseFile)
+                  }
+                }
+              case _ => baseFileReader(file)
+            }
+          }
+        case _ => baseFileReader(file)
+      }
+    }
+  }
+
+  /**
+   * Build file readers to read individual physical files
+   */
+ protected def buildFileReaders(sparkSession: SparkSession, dataSchema: 
StructType, partitionSchema: StructType,
+                       requiredSchema: StructType, filters: Seq[Filter], 
options: Map[String, String],
+                       hadoopConf: Configuration, requiredSchemaWithMandatory: 
StructType,
+                       requiredWithoutMeta: StructType, requiredMeta: 
StructType):
+  (PartitionedFile => Iterator[InternalRow],
+    PartitionedFile => Iterator[InternalRow],
+    PartitionedFile => Iterator[InternalRow],
+    PartitionedFile => Iterator[InternalRow]) = {
+
+    //file reader when you just read a hudi parquet file and don't do any 
merging
+
+    val baseFileReader = super.buildReaderWithPartitionValues(sparkSession, 
dataSchema, partitionSchema, requiredSchema,
+      filters, options, new Configuration(hadoopConf))
+
+    //file reader for reading a hudi base file that needs to be merged with 
log files
+    val preMergeBaseFileReader = if (isMOR) {
+      super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
StructType(Seq.empty),
+        requiredSchemaWithMandatory, Seq.empty, options, new 
Configuration(hadoopConf))
+    } else {
+      _: PartitionedFile => Iterator.empty
+    }
+
+    //Rules for appending partitions and filtering in the bootstrap readers:
+    // 1. if it is mor, we don't want to filter data or append partitions
+    // 2. if we need to merge the bootstrap base and skeleton files then we 
cannot filter
+    // 3. if we need to merge the bootstrap base and skeleton files then we 
should never append partitions to the
+    //    skeleton reader
+
+    val needMetaCols = requiredMeta.nonEmpty
+    val needDataCols = requiredWithoutMeta.nonEmpty
+
+    //file reader for bootstrap skeleton files
+    val skeletonReader = if (needMetaCols && isBootstrap) {
+      if (needDataCols || isMOR) {
+        // no filter and no append
+        super.buildReaderWithPartitionValues(sparkSession, 
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
+          requiredMeta, Seq.empty, options, new Configuration(hadoopConf))
+      } else {
+        // filter and append
+        super.buildReaderWithPartitionValues(sparkSession, 
HoodieSparkUtils.getMetaSchema, partitionSchema,
+          requiredMeta, filters, options, new Configuration(hadoopConf))
+      }
+    } else {
+      _: PartitionedFile => Iterator.empty
+    }
+
+    //file reader for bootstrap base files
+    val bootstrapBaseReader = if (needDataCols && isBootstrap) {
+      val dataSchemaWithoutMeta = StructType(dataSchema.fields.filterNot(sf => 
isMetaField(sf.name)))
+      if (isMOR) {
+        // no filter and no append
+        super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
+          Seq.empty, options, new Configuration(hadoopConf))
+      } else if (needMetaCols) {
+        // no filter but append
+        super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
+          Seq.empty, options, new Configuration(hadoopConf))
+      } else {
+        // filter and append
+        super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
+          filters, options, new Configuration(hadoopConf))
+      }
+    } else {
+      _: PartitionedFile => Iterator.empty
+    }
+
+    (baseFileReader, preMergeBaseFileReader, skeletonReader, 
bootstrapBaseReader)
+  }
+
+  /**
+   * Create iterator for a file slice that has bootstrap base and skeleton file
+   */
+  protected def buildBootstrapIterator(skeletonReader: PartitionedFile => 
Iterator[InternalRow],
+                             bootstrapBaseReader: PartitionedFile => 
Iterator[InternalRow],
+                             skeletonReaderAppend: Boolean, 
bootstrapBaseAppend: Boolean,
+                             bootstrapBaseFile: BaseFile, hoodieBaseFile: 
BaseFile,
+                             partitionValues: InternalRow, needMetaCols: 
Boolean,
+                             needDataCols: Boolean): Iterator[InternalRow] = {
+    lazy val skeletonFile = if (skeletonReaderAppend) {
+      createPartitionedFile(partitionValues, hoodieBaseFile.getHadoopPath, 0, 
hoodieBaseFile.getFileLen)
+    } else {
+      createPartitionedFile(InternalRow.empty, hoodieBaseFile.getHadoopPath, 
0, hoodieBaseFile.getFileLen)
+    }
+
+    lazy val dataFile = if (bootstrapBaseAppend) {
+      createPartitionedFile(partitionValues, bootstrapBaseFile.getHadoopPath, 
0, bootstrapBaseFile.getFileLen)
+    } else {
+      createPartitionedFile(InternalRow.empty, 
bootstrapBaseFile.getHadoopPath, 0, bootstrapBaseFile.getFileLen)
+    }
+
+    lazy val skeletonIterator = skeletonReader(skeletonFile)
+    lazy val dataFileIterator = bootstrapBaseReader(dataFile)
+
+    (needMetaCols, needDataCols) match {
+      case (true, true) => doBootstrapMerge(skeletonIterator, dataFileIterator)
+      case (true, false) => skeletonIterator
+      case (false, true) => dataFileIterator
+      case (false, false) => throw new IllegalStateException("should not be 
here if only partition cols are required")
+    }
+  }
+
+  /**
+   * Merge skeleton and data file iterators
+   */
+  protected def doBootstrapMerge(skeletonFileIterator: Iterator[Any], 
dataFileIterator: Iterator[Any]): Iterator[InternalRow] = {
+    new Iterator[Any] {
+      val combinedRow = new JoinedRow()
+
+      override def hasNext: Boolean = {
+        checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
+          "Bootstrap data-file iterator and skeleton-file iterator have to be 
in-sync!")
+        dataFileIterator.hasNext && skeletonFileIterator.hasNext
+      }
+
+      override def next(): Any = {
+        (skeletonFileIterator.next(), dataFileIterator.next()) match {
+          case (s: ColumnarBatch, d: ColumnarBatch) =>
+            val numCols = s.numCols() + d.numCols()
+            val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
+            for (i <- 0 until numCols) {
+              if (i < s.numCols()) {
+                vecs(i) = s.column(i)
+              } else {
+                vecs(i) = d.column(i - s.numCols())
+              }
+            }
+            assert(s.numRows() == d.numRows())
+            sparkAdapter.makeColumnarBatch(vecs, s.numRows())
+          case (_: ColumnarBatch, _: InternalRow) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
+          case (_: InternalRow, _: ColumnarBatch) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
+          case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+        }
+      }
+    }.asInstanceOf[Iterator[InternalRow]]
+  }
+
+  /**
+   * Create iterator for a file slice that has log files
+   */
+  protected def buildMergeOnReadIterator(iter: Iterator[InternalRow], 
logFiles: List[HoodieLogFile],
+                               partitionPath: Path, inputSchema: StructType, 
requiredSchemaWithMandatory: StructType,
+                               outputSchema: StructType, partitionSchema: 
StructType, partitionValues: InternalRow,
+                               hadoopConf: Configuration): 
Iterator[InternalRow] = {
+
+    val requiredAvroSchema = 
HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, tableName)
+    val morIterator =  mergeType match {
+      case REALTIME_SKIP_MERGE_OPT_VAL => throw new 
UnsupportedOperationException("Skip merge is not currently " +
+        "implemented for the New Hudi Parquet File format")
+        //new SkipMergeIterator(logFiles, partitionPath, iter, inputSchema, 
tableSchema.value,
+        //  requiredSchemaWithMandatory, requiredAvroSchema, tableState.value, 
hadoopConf)
+      case REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
+        new RecordMergingFileIterator(logFiles, partitionPath, iter, 
inputSchema, tableSchema.value,
+          requiredSchemaWithMandatory, requiredAvroSchema, tableState.value, 
hadoopConf)
+    }
+    appendPartitionAndProject(morIterator, requiredSchemaWithMandatory, 
partitionSchema,
+      outputSchema, partitionValues)
+  }
+
+  /**
+   * Append partition values to rows and project to output schema
+   */
+  protected def appendPartitionAndProject(iter: Iterator[InternalRow],
+                                inputSchema: StructType,
+                                partitionSchema: StructType,
+                                to: StructType,
+                                partitionValues: InternalRow): 
Iterator[InternalRow] = {
+    if (partitionSchema.isEmpty) {
+      projectSchema(iter, inputSchema, to)
+    } else {
+      val unsafeProjection = 
generateUnsafeProjection(StructType(inputSchema.fields ++ 
partitionSchema.fields), to)
+      val joinedRow = new JoinedRow()
+      iter.map(d => unsafeProjection(joinedRow(d, partitionValues)))
+    }
+  }
+
+  protected def projectSchema(iter: Iterator[InternalRow],
+                    from: StructType,
+                    to: StructType): Iterator[InternalRow] = {
+    val unsafeProjection = generateUnsafeProjection(from, to)
+    iter.map(d => unsafeProjection(d))
+  }
+
+  protected def getLogFilesFromSlice(fileSlice: FileSlice): 
List[HoodieLogFile] = {
+    
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 1f25493e5e9..3c2d41aa582 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -265,6 +265,9 @@ object HoodieAnalysis extends SparkAdapterSupport {
 
           case ut @ UpdateTable(relation @ ResolvesToHudiTable(_), _, _) =>
             ut.copy(table = relation)
+
+          case logicalPlan: LogicalPlan if logicalPlan.resolved =>
+            
sparkAdapter.getCatalystPlanUtils.applyNewHoodieParquetFileFormatProjection(logicalPlan)
         }
       }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
index bbce1c61f0f..f57be60461a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
@@ -18,86 +18,27 @@
 
 package org.apache.hudi.functional;
 
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector;
-import 
org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
-import 
org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
-import 
org.apache.hudi.client.bootstrap.translator.DecodedBootstrapPartitionPathTranslator;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.config.HoodieBootstrapConfig;
-import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.keygen.ComplexKeyGenerator;
-import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
-import org.apache.hudi.keygen.SimpleKeyGenerator;
-import org.apache.hudi.testutils.HoodieSparkClientTestBase;
 
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.functions;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
 import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
-import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
-import static org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
  * Tests different layouts for bootstrap base path
  */
 @Tag("functional")
-public class TestBootstrapRead extends HoodieSparkClientTestBase {
-
-  @TempDir
-  public java.nio.file.Path tmpFolder;
-  protected String bootstrapBasePath = null;
-  protected String bootstrapTargetPath = null;
-  protected String hudiBasePath = null;
-
-  protected static int nInserts = 100;
-  protected static int nUpdates = 20;
-  protected static String[] dashPartitionPaths = {"2016-03-14","2016-03-15", 
"2015-03-16", "2015-03-17"};
-  protected static String[] slashPartitionPaths = {"2016/03/15", "2015/03/16", 
"2015/03/17"};
-  protected String bootstrapType;
-  protected Boolean dashPartitions;
-  protected HoodieTableType tableType;
-  protected Integer nPartitions;
-
-  protected String[] partitionCols;
-  protected static String[] dropColumns = {"_hoodie_commit_time", 
"_hoodie_commit_seqno", "_hoodie_record_key",  "_hoodie_file_name", 
"city_to_state"};
-
-  @BeforeEach
-  public void setUp() throws Exception {
-    bootstrapBasePath = tmpFolder.toAbsolutePath() + "/bootstrapBasePath";
-    hudiBasePath = tmpFolder.toAbsolutePath() + "/hudiBasePath";
-    bootstrapTargetPath = tmpFolder.toAbsolutePath() + "/bootstrapTargetPath";
-    initSparkContexts();
-  }
-
-  @AfterEach
-  public void tearDown() throws IOException {
-    cleanupSparkContexts();
-    cleanupClients();
-    cleanupTestDataGenerator();
-  }
-
+public class TestBootstrapRead extends TestBootstrapReadBase {
   private static Stream<Arguments> testArgs() {
     Stream.Builder<Arguments> b = Stream.builder();
     String[] bootstrapType = {"full", "metadata", "mixed"};
@@ -149,193 +90,4 @@ public class TestBootstrapRead extends 
HoodieSparkClientTestBase {
     compareTables();
     verifyMetaColOnlyRead(2);
   }
-
-  protected Map<String, String> basicOptions() {
-    Map<String, String> options = new HashMap<>();
-    options.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType.name());
-    options.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), 
"true");
-    options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key");
-    if (nPartitions == 0) {
-      options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), 
NonpartitionedKeyGenerator.class.getName());
-    } else {
-      options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), 
String.join(",", partitionCols));
-      if (nPartitions == 1) {
-        options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), 
SimpleKeyGenerator.class.getName());
-      } else {
-        options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), 
ComplexKeyGenerator.class.getName());
-      }
-    }
-    options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp");
-    if (tableType.equals(MERGE_ON_READ)) {
-      options.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
-    }
-    options.put(HoodieWriteConfig.TBL_NAME.key(), "test");
-    return options;
-  }
-
-  protected Map<String, String> setBootstrapOptions() {
-    Map<String, String> options = basicOptions();
-    options.put(DataSourceWriteOptions.OPERATION().key(), 
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL());
-    options.put(HoodieBootstrapConfig.BASE_PATH.key(), bootstrapBasePath);
-    if (!dashPartitions) {
-      
options.put(HoodieBootstrapConfig.PARTITION_PATH_TRANSLATOR_CLASS_NAME.key(), 
DecodedBootstrapPartitionPathTranslator.class.getName());
-    }
-    switch (bootstrapType) {
-      case "metadata":
-        options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), 
MetadataOnlyBootstrapModeSelector.class.getName());
-        break;
-      case "full":
-        options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), 
FullRecordBootstrapModeSelector.class.getName());
-        break;
-      case "mixed":
-        options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), 
BootstrapRegexModeSelector.class.getName());
-        String regexPattern;
-        if (dashPartitions) {
-          regexPattern = "partition_path=2015-03-1[5-7]";
-        } else {
-          regexPattern = "partition_path=2015%2F03%2F1[5-7]";
-        }
-        if (nPartitions > 1) {
-          regexPattern = regexPattern + "\\/.*";
-        }
-        
options.put(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_PATTERN.key(), 
regexPattern);
-        break;
-      default:
-        throw new RuntimeException();
-    }
-    return options;
-  }
-
-  protected void doUpdate(Map<String,String> options, String instantTime) {
-    Dataset<Row> updates = generateTestUpdates(instantTime, nUpdates);
-    doUpsert(options, updates);
-  }
-
-  protected void doInsert(Map<String,String> options, String instantTime) {
-    Dataset<Row> inserts = generateTestInserts(instantTime, nUpdates);
-    doUpsert(options, inserts);
-  }
-
-  protected void doUpsert(Map<String,String> options, Dataset<Row> df) {
-    String nCompactCommits = "3";
-    df.write().format("hudi")
-        .options(options)
-        .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
nCompactCommits)
-        .mode(SaveMode.Append)
-        .save(hudiBasePath);
-    if (bootstrapType.equals("mixed")) {
-      // mixed tables have a commit for each of the metadata and full 
bootstrap modes
-      // so to align with the regular hudi table, we need to compact after 4 
commits instead of 3
-      nCompactCommits = "4";
-    }
-    df.write().format("hudi")
-        .options(options)
-        .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
nCompactCommits)
-        .mode(SaveMode.Append)
-        .save(bootstrapTargetPath);
-  }
-
-  protected void compareTables() {
-    Dataset<Row> hudiDf = 
sparkSession.read().format("hudi").load(hudiBasePath);
-    Dataset<Row> bootstrapDf = 
sparkSession.read().format("hudi").load(bootstrapTargetPath);
-    Dataset<Row> fastBootstrapDf = 
sparkSession.read().format("hudi").option(DATA_QUERIES_ONLY.key(), 
"true").load(bootstrapTargetPath);
-    if (nPartitions == 0) {
-      compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns));
-      if (tableType.equals(COPY_ON_WRITE)) {
-        compareDf(fastBootstrapDf.drop("city_to_state"), 
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path"));
-      }
-      return;
-    }
-    compareDf(hudiDf.drop(dropColumns).drop(partitionCols), 
bootstrapDf.drop(dropColumns).drop(partitionCols));
-    compareDf(hudiDf.select("_row_key",partitionCols), 
bootstrapDf.select("_row_key",partitionCols));
-    if (tableType.equals(COPY_ON_WRITE)) {
-      compareDf(fastBootstrapDf.drop("city_to_state").drop(partitionCols), 
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path").drop(partitionCols));
-      compareDf(fastBootstrapDf.select("_row_key",partitionCols), 
bootstrapDf.select("_row_key",partitionCols));
-    }
-  }
-
-  protected void verifyMetaColOnlyRead(Integer iteration) {
-    Dataset<Row> hudiDf = 
sparkSession.read().format("hudi").load(hudiBasePath).select("_hoodie_commit_time",
 "_hoodie_record_key");
-    Dataset<Row> bootstrapDf = 
sparkSession.read().format("hudi").load(bootstrapTargetPath).select("_hoodie_commit_time",
 "_hoodie_record_key");
-    hudiDf.show(100,false);
-    bootstrapDf.show(100,false);
-    if (iteration > 0) {
-      assertEquals(sparkSession.sql("select * from hudi_iteration_" + 
(iteration - 1)).intersect(hudiDf).count(),
-          sparkSession.sql("select * from bootstrap_iteration_" + (iteration - 
1)).intersect(bootstrapDf).count());
-    }
-    hudiDf.createOrReplaceTempView("hudi_iteration_" + iteration);
-    bootstrapDf.createOrReplaceTempView("bootstrap_iteration_" + iteration);
-  }
-
-  protected void compareDf(Dataset<Row> df1, Dataset<Row> df2) {
-    assertEquals(0, df1.except(df2).count());
-    assertEquals(0, df2.except(df1).count());
-  }
-
-  protected void setupDirs()  {
-    dataGen = new HoodieTestDataGenerator(dashPartitions ? dashPartitionPaths 
: slashPartitionPaths);
-    Dataset<Row> inserts = generateTestInserts("000", nInserts);
-    if (dashPartitions) {
-      //test adding a partition to the table
-      inserts = inserts.filter("partition_path != '2016-03-14'");
-    }
-    if (nPartitions > 0) {
-      partitionCols = new String[nPartitions];
-      partitionCols[0] = "partition_path";
-      for (int i = 1; i < partitionCols.length; i++) {
-        partitionCols[i] = "partpath" + (i + 1);
-      }
-      inserts.write().partitionBy(partitionCols).save(bootstrapBasePath);
-    } else {
-      inserts.write().save(bootstrapBasePath);
-    }
-
-    inserts.write().format("hudi")
-        .options(basicOptions())
-        .mode(SaveMode.Overwrite)
-        .save(hudiBasePath);
-  }
-
-  protected Dataset<Row> makeInsertDf(String instantTime, Integer n) {
-    List<String> records = dataGen.generateInserts(instantTime, n).stream()
-        .map(r -> recordToString(r).get()).collect(Collectors.toList());
-    JavaRDD<String> rdd = jsc.parallelize(records);
-    return sparkSession.read().json(rdd);
-  }
-
-  protected Dataset<Row> generateTestInserts(String instantTime, Integer n) {
-    return addPartitionColumns(makeInsertDf(instantTime, n), nPartitions);
-  }
-
-  protected Dataset<Row> makeUpdateDf(String instantTime, Integer n) {
-    try {
-      List<String> records = dataGen.generateUpdates(instantTime, n).stream()
-          .map(r -> recordToString(r).get()).collect(Collectors.toList());
-      JavaRDD<String> rdd = jsc.parallelize(records);
-      return sparkSession.read().json(rdd);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  protected Dataset<Row> generateTestUpdates(String instantTime, Integer n) {
-    return addPartitionColumns(makeUpdateDf(instantTime, n), nPartitions);
-  }
-
-  private static Dataset<Row> addPartitionColumns(Dataset<Row> df, Integer 
nPartitions) {
-    if (nPartitions < 2) {
-      return df;
-    }
-    for (int i = 2; i <= nPartitions; i++) {
-      df = applyPartition(df, i);
-    }
-    return df;
-  }
-
-  private static Dataset<Row> applyPartition(Dataset<Row> df, Integer n) {
-    return df.withColumn("partpath" + n,
-        functions.md5(functions.concat_ws("," + n + ",",
-            df.col("partition_path"),
-            functions.hash(df.col("_row_key")).mod(n))));
-  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
similarity index 84%
copy from 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
copy to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
index bbce1c61f0f..d3246f7c4da 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
@@ -42,28 +42,22 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
+import static 
org.apache.hudi.DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT;
 import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
 import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
 import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
 import static org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-/**
- * Tests different layouts for bootstrap base path
- */
 @Tag("functional")
-public class TestBootstrapRead extends HoodieSparkClientTestBase {
+public abstract class TestBootstrapReadBase extends HoodieSparkClientTestBase {
 
   @TempDir
   public java.nio.file.Path tmpFolder;
@@ -98,58 +92,6 @@ public class TestBootstrapRead extends 
HoodieSparkClientTestBase {
     cleanupTestDataGenerator();
   }
 
-  private static Stream<Arguments> testArgs() {
-    Stream.Builder<Arguments> b = Stream.builder();
-    String[] bootstrapType = {"full", "metadata", "mixed"};
-    Boolean[] dashPartitions = {true,false};
-    HoodieTableType[] tableType = {COPY_ON_WRITE, MERGE_ON_READ};
-    Integer[] nPartitions = {0, 1, 2};
-    for (HoodieTableType tt : tableType) {
-      for (Boolean dash : dashPartitions) {
-        for (String bt : bootstrapType) {
-          for (Integer n : nPartitions) {
-            // can't be mixed bootstrap if it's nonpartitioned
-            // don't need to test slash partitions if it's nonpartitioned
-            if ((!bt.equals("mixed") && dash) || n > 0) {
-              b.add(Arguments.of(bt, dash, tt, n));
-            }
-          }
-        }
-      }
-    }
-    return b.build();
-  }
-
-  @ParameterizedTest
-  @MethodSource("testArgs")
-  public void runTests(String bootstrapType, Boolean dashPartitions, 
HoodieTableType tableType, Integer nPartitions) {
-    this.bootstrapType = bootstrapType;
-    this.dashPartitions = dashPartitions;
-    this.tableType = tableType;
-    this.nPartitions = nPartitions;
-    setupDirs();
-
-    // do bootstrap
-    Map<String, String> options = setBootstrapOptions();
-    Dataset<Row> bootstrapDf = sparkSession.emptyDataFrame();
-    bootstrapDf.write().format("hudi")
-        .options(options)
-        .mode(SaveMode.Overwrite)
-        .save(bootstrapTargetPath);
-    compareTables();
-    verifyMetaColOnlyRead(0);
-
-    // do upserts
-    options = basicOptions();
-    doUpdate(options, "001");
-    compareTables();
-    verifyMetaColOnlyRead(1);
-
-    doInsert(options, "002");
-    compareTables();
-    verifyMetaColOnlyRead(2);
-  }
-
   protected Map<String, String> basicOptions() {
     Map<String, String> options = new HashMap<>();
     options.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType.name());
@@ -216,6 +158,11 @@ public class TestBootstrapRead extends 
HoodieSparkClientTestBase {
     doUpsert(options, inserts);
   }
 
+  protected void doDelete(Map<String,String> options, String instantTime) {
+    Dataset<Row> deletes = generateTestDeletes(instantTime, nUpdates);
+    doUpsert(options, deletes);
+  }
+
   protected void doUpsert(Map<String,String> options, Dataset<Row> df) {
     String nCompactCommits = "3";
     df.write().format("hudi")
@@ -239,16 +186,17 @@ public class TestBootstrapRead extends 
HoodieSparkClientTestBase {
     Dataset<Row> hudiDf = 
sparkSession.read().format("hudi").load(hudiBasePath);
     Dataset<Row> bootstrapDf = 
sparkSession.read().format("hudi").load(bootstrapTargetPath);
     Dataset<Row> fastBootstrapDf = 
sparkSession.read().format("hudi").option(DATA_QUERIES_ONLY.key(), 
"true").load(bootstrapTargetPath);
+    boolean shouldTestFastBootstrap = tableType.equals(COPY_ON_WRITE) && 
!Boolean.parseBoolean(USE_NEW_HUDI_PARQUET_FILE_FORMAT().defaultValue());
     if (nPartitions == 0) {
       compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns));
-      if (tableType.equals(COPY_ON_WRITE)) {
+      if (shouldTestFastBootstrap) {
         compareDf(fastBootstrapDf.drop("city_to_state"), 
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path"));
       }
       return;
     }
     compareDf(hudiDf.drop(dropColumns).drop(partitionCols), 
bootstrapDf.drop(dropColumns).drop(partitionCols));
     compareDf(hudiDf.select("_row_key",partitionCols), 
bootstrapDf.select("_row_key",partitionCols));
-    if (tableType.equals(COPY_ON_WRITE)) {
+    if (shouldTestFastBootstrap) {
       compareDf(fastBootstrapDf.drop("city_to_state").drop(partitionCols), 
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path").drop(partitionCols));
       compareDf(fastBootstrapDf.select("_row_key",partitionCols), 
bootstrapDf.select("_row_key",partitionCols));
     }
@@ -296,6 +244,17 @@ public class TestBootstrapRead extends 
HoodieSparkClientTestBase {
         .save(hudiBasePath);
   }
 
+  protected Dataset<Row> makeDeleteDf(String instantTime, Integer n) {
+    List<String> records = dataGen.generateUniqueDeleteRecords(instantTime, 
n).stream()
+        .map(r -> recordToString(r).get()).collect(Collectors.toList());
+    JavaRDD<String> rdd = jsc.parallelize(records);
+    return sparkSession.read().json(rdd);
+  }
+
+  protected Dataset<Row> generateTestDeletes(String instantTime, Integer n) {
+    return addPartitionColumns(makeDeleteDf(instantTime, n), nPartitions);
+  }
+
   protected Dataset<Row> makeInsertDf(String instantTime, Integer n) {
     List<String> records = dataGen.generateInserts(instantTime, n).stream()
         .map(r -> recordToString(r).get()).collect(Collectors.toList());
@@ -322,7 +281,7 @@ public class TestBootstrapRead extends 
HoodieSparkClientTestBase {
     return addPartitionColumns(makeUpdateDf(instantTime, n), nPartitions);
   }
 
-  private static Dataset<Row> addPartitionColumns(Dataset<Row> df, Integer 
nPartitions) {
+  protected static Dataset<Row> addPartitionColumns(Dataset<Row> df, Integer 
nPartitions) {
     if (nPartitions < 2) {
       return df;
     }
@@ -332,7 +291,7 @@ public class TestBootstrapRead extends 
HoodieSparkClientTestBase {
     return df;
   }
 
-  private static Dataset<Row> applyPartition(Dataset<Row> df, Integer n) {
+  protected static Dataset<Row> applyPartition(Dataset<Row> df, Integer n) {
     return df.withColumn("partpath" + n,
         functions.md5(functions.concat_ws("," + n + ",",
             df.col("partition_path"),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
new file mode 100644
index 00000000000..ef6814f21c5
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.common.model.HoodieTableType;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("functional")
+public class TestNewHoodieParquetFileFormat extends TestBootstrapReadBase {
+
+  private static Stream<Arguments> testArgs() {
+    Stream.Builder<Arguments> b = Stream.builder();
+    HoodieTableType[] tableType = {COPY_ON_WRITE, MERGE_ON_READ};
+    Integer[] nPartitions = {0, 1, 2};
+    for (HoodieTableType tt : tableType) {
+      for (Integer n : nPartitions) {
+        b.add(Arguments.of(tt, n));
+      }
+    }
+    return b.build();
+  }
+
+  @ParameterizedTest
+  @MethodSource("testArgs")
+  public void runTests(HoodieTableType tableType, Integer nPartitions) {
+    this.bootstrapType = nPartitions == 0 ? "metadata" : "mixed";
+    this.dashPartitions = true;
+    this.tableType = tableType;
+    this.nPartitions = nPartitions;
+    setupDirs();
+
+    //do bootstrap
+    Map<String, String> options = setBootstrapOptions();
+    Dataset<Row> bootstrapDf = sparkSession.emptyDataFrame();
+    bootstrapDf.write().format("hudi")
+        .options(options)
+        .mode(SaveMode.Overwrite)
+        .save(bootstrapTargetPath);
+    runComparisons();
+
+    options = basicOptions();
+    doUpdate(options, "001");
+    runComparisons();
+
+    doInsert(options, "002");
+    runComparisons();
+
+    doDelete(options, "003");
+    runComparisons();
+  }
+
+  protected void runComparisons() {
+    if (tableType.equals(MERGE_ON_READ)) {
+      runComparison(hudiBasePath);
+    }
+    runComparison(bootstrapTargetPath);
+  }
+
+  protected void runComparison(String tableBasePath) {
+    testCount(tableBasePath);
+    runIndividualComparison(tableBasePath);
+    runIndividualComparison(tableBasePath, "partition_path");
+    runIndividualComparison(tableBasePath, "_hoodie_record_key", 
"_hoodie_commit_time", "_hoodie_partition_path");
+    runIndividualComparison(tableBasePath, "_hoodie_commit_time", 
"_hoodie_commit_seqno");
+    runIndividualComparison(tableBasePath, "_hoodie_commit_time", 
"_hoodie_commit_seqno", "partition_path");
+    runIndividualComparison(tableBasePath, "_row_key", "_hoodie_commit_seqno", 
"_hoodie_record_key", "_hoodie_partition_path");
+    runIndividualComparison(tableBasePath, "_row_key", "partition_path", 
"_hoodie_is_deleted", "begin_lon");
+  }
+
+  protected void testCount(String tableBasePath) {
+    Dataset<Row> legacyDf = sparkSession.read().format("hudi")
+        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), 
"false").load(tableBasePath);
+    Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
+        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), 
"true").load(tableBasePath);
+    assertEquals(legacyDf.count(), fileFormatDf.count());
+  }
+
+  protected scala.collection.Seq<String> seq(String... a) {
+    return 
scala.collection.JavaConverters.asScalaBufferConverter(Arrays.asList(a)).asScala().toSeq();
+  }
+
+  protected void runIndividualComparison(String tableBasePath) {
+    runIndividualComparison(tableBasePath, "");
+  }
+
+  protected void runIndividualComparison(String tableBasePath, String 
firstColumn, String... columns) {
+    Dataset<Row> legacyDf = sparkSession.read().format("hudi")
+        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), 
"false").load(tableBasePath);
+    Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
+        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), 
"true").load(tableBasePath);
+    if (firstColumn.isEmpty()) {
+      //df.except(df) does not work with map type cols
+      legacyDf = legacyDf.drop("city_to_state");
+      fileFormatDf = fileFormatDf.drop("city_to_state");
+    } else {
+      if (columns.length > 0) {
+        legacyDf = legacyDf.select(firstColumn, columns);
+        fileFormatDf = fileFormatDf.select(firstColumn, columns);
+      } else {
+        legacyDf = legacyDf.select(firstColumn);
+        fileFormatDf = fileFormatDf.select(firstColumn);
+      }
+    }
+    compareDf(legacyDf, fileFormatDf);
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 9f551a0bf1d..cdb4c5226a6 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -17,13 +17,17 @@
 
 package org.apache.spark.sql
 
+import org.apache.hudi.SparkHoodieTableFileIndex
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.JoinType
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, 
LogicalPlan, MergeIntoTable}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, 
LogicalPlan, MergeIntoTable, Project}
 import 
org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, 
ExplainCommand}
+import 
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.internal.SQLConf
 
 object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils {
@@ -84,4 +88,14 @@ object HoodieSpark2CatalystPlanUtils extends 
HoodieCatalystPlansUtils {
   override def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType: 
JoinType, condition: Option[Expression], hint: String): LogicalPlan = {
     Join(left, right, joinType, condition)
   }
+
+  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
+    plan match {
+      case p@PhysicalOperation(_, _,
+      l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if 
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && 
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
+        fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = 
true
+        
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, 
fs.sparkSession.sessionState.analyzer.resolver), p)
+      case _ => plan
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index cc2e25f9891..ec275a1d3fd 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -33,13 +33,14 @@ import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, 
Join, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.DateFormatter
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark24HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark24LegacyHoodieParquetFileFormat}
 import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
 import org.apache.spark.sql.parser.HoodieExtendedParserInterface
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, 
StructType}
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel._
 
@@ -143,8 +144,8 @@ class Spark2Adapter extends SparkAdapter {
     partitions.toSeq
   }
 
-  override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): 
Option[ParquetFileFormat] = {
-    Some(new Spark24HoodieParquetFileFormat(appendPartitionValues))
+  override def createLegacyHoodieParquetFileFormat(appendPartitionValues: 
Boolean): Option[ParquetFileFormat] = {
+    Some(new Spark24LegacyHoodieParquetFileFormat(appendPartitionValues))
   }
 
   override def createInterpretedPredicate(e: Expression): InterpretedPredicate 
= {
@@ -200,4 +201,10 @@ class Spark2Adapter extends SparkAdapter {
 
     DataSourceStrategy.translateFilter(predicate)
   }
+
+  override def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int): 
ColumnarBatch = {
+    val batch = new ColumnarBatch(vectors)
+    batch.setNumRows(numRows)
+    batch
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24LegacyHoodieParquetFileFormat.scala
similarity index 99%
rename from 
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
rename to 
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24LegacyHoodieParquetFileFormat.scala
index c168911302e..a6a4ab943a1 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24LegacyHoodieParquetFileFormat.scala
@@ -50,7 +50,7 @@ import java.net.URI
  *   <li>Avoiding appending partition values to the rows read from the data 
file</li>
  * </ol>
  */
-class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: 
Boolean) extends ParquetFileFormat {
+class Spark24LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
 
   override def buildReaderWithPartitionValues(sparkSession: SparkSession,
                                               dataSchema: StructType,
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index ce9499ae7d2..b2a9a529511 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.{HoodieSpark3CatalogUtils, SQLContext, 
SparkSession}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
 import org.apache.spark.storage.StorageLevel
 
 import java.time.ZoneId
@@ -97,4 +98,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter with 
Logging {
                                supportNestedPredicatePushdown: Boolean = 
false): Option[Filter] = {
     DataSourceStrategy.translateFilter(predicate, 
supportNestedPredicatePushdown)
   }
+
+  override def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int): 
ColumnarBatch = {
+    new ColumnarBatch(vectors, numRows)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
index aefc6af7e3e..e9757c821d9 100644
--- 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
@@ -18,12 +18,16 @@
 
 package org.apache.spark.sql
 
+import org.apache.hudi.SparkHoodieTableFileIndex
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.ResolvedTable
 import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, 
ProjectionOverSchema}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable}
+import org.apache.spark.sql.catalyst.planning.ScanOperation
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable, Project}
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import 
org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
+import 
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.types.StructType
 
 object HoodieSpark30CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -42,6 +46,16 @@ object HoodieSpark30CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
     }
   }
 
+  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
+    plan match {
+      case s@ScanOperation(_, _,
+      l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if 
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && 
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
+        fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = 
true
+        
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, 
fs.sparkSession.sessionState.analyzer.resolver), s)
+      case _ => plan
+    }
+  }
+
   override def projectOverSchema(schema: StructType, output: AttributeSet): 
ProjectionOverSchema = ProjectionOverSchema(schema)
 
   override def isRepairTable(plan: LogicalPlan): Boolean = {
diff --git 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
index 5abd46948eb..22a9f090fb3 100644
--- 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
@@ -29,7 +29,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark30HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark30LegacyHoodieParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, 
HoodieSpark30PartitionedFileUtils, HoodieSparkPartitionedFileUtils, 
PartitionedFile}
 import org.apache.spark.sql.hudi.SparkAdapter
@@ -84,8 +84,8 @@ class Spark3_0Adapter extends BaseSpark3Adapter {
   override def createExtendedSparkParser(spark: SparkSession, delegate: 
ParserInterface): HoodieExtendedParserInterface =
     new HoodieSpark3_0ExtendedSqlParser(spark, delegate)
 
-  override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): 
Option[ParquetFileFormat] = {
-    Some(new Spark30HoodieParquetFileFormat(appendPartitionValues))
+  override def createLegacyHoodieParquetFileFormat(appendPartitionValues: 
Boolean): Option[ParquetFileFormat] = {
+    Some(new Spark30LegacyHoodieParquetFileFormat(appendPartitionValues))
   }
 
   override def createHoodieFileScanRDD(sparkSession: SparkSession,
diff --git 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30HoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
similarity index 98%
rename from 
hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30HoodieParquetFileFormat.scala
rename to 
hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
index 4c33ac89677..de0be0db04b 100644
--- 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30HoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30LegacyHoodieParquetFileFormat.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import 
org.apache.spark.sql.execution.datasources.parquet.Spark30HoodieParquetFileFormat.{createParquetFilters,
 pruneInternalSchema, rebuildFilterFromParquet}
+import 
org.apache.spark.sql.execution.datasources.parquet.Spark30LegacyHoodieParquetFileFormat.{createParquetFilters,
 pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, 
PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -61,7 +61,7 @@ import java.net.URI
  *   <li>Schema on-read</li>
  * </ol>
  */
-class Spark30HoodieParquetFileFormat(private val shouldAppendPartitionValues: 
Boolean) extends ParquetFileFormat {
+class Spark30LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
 
   override def buildReaderWithPartitionValues(sparkSession: SparkSession,
                                               dataSchema: StructType,
@@ -336,7 +336,7 @@ class Spark30HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
   }
 }
 
-object Spark30HoodieParquetFileFormat {
+object Spark30LegacyHoodieParquetFileFormat {
 
   def pruneInternalSchema(internalSchemaStr: String, requiredSchema: 
StructType): String = {
     val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
index 2fcc88d4b37..df94529ce12 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
@@ -18,12 +18,16 @@
 
 package org.apache.spark.sql
 
+import org.apache.hudi.SparkHoodieTableFileIndex
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.ResolvedTable
 import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, 
ProjectionOverSchema}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable}
+import org.apache.spark.sql.catalyst.planning.ScanOperation
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable, Project}
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import 
org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
+import 
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.types.StructType
 
 object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -42,6 +46,16 @@ object HoodieSpark31CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
     }
   }
 
+  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
+    plan match {
+      case s@ScanOperation(_, _,
+      l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if 
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && 
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
+        fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = 
true
+        
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, 
fs.sparkSession.sessionState.analyzer.resolver), s)
+      case _ => plan
+    }
+  }
+
   override def projectOverSchema(schema: StructType, output: AttributeSet): 
ProjectionOverSchema = ProjectionOverSchema(schema)
 
   override def isRepairTable(plan: LogicalPlan): Boolean = {
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
index 547309c0e10..8ca072333d0 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark31HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark31LegacyHoodieParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, 
HoodieSpark31PartitionedFileUtils, HoodieSparkPartitionedFileUtils, 
PartitionedFile}
 import org.apache.spark.sql.hudi.SparkAdapter
@@ -85,8 +85,8 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
   override def createExtendedSparkParser(spark: SparkSession, delegate: 
ParserInterface): HoodieExtendedParserInterface =
     new HoodieSpark3_1ExtendedSqlParser(spark, delegate)
 
-  override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): 
Option[ParquetFileFormat] = {
-    Some(new Spark31HoodieParquetFileFormat(appendPartitionValues))
+  override def createLegacyHoodieParquetFileFormat(appendPartitionValues: 
Boolean): Option[ParquetFileFormat] = {
+    Some(new Spark31LegacyHoodieParquetFileFormat(appendPartitionValues))
   }
 
   override def createHoodieFileScanRDD(sparkSession: SparkSession,
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala
similarity index 98%
rename from 
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
rename to 
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala
index a90d36a02de..2d844007506 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31LegacyHoodieParquetFileFormat.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import 
org.apache.spark.sql.execution.datasources.parquet.Spark31HoodieParquetFileFormat.{createParquetFilters,
 pruneInternalSchema, rebuildFilterFromParquet}
+import 
org.apache.spark.sql.execution.datasources.parquet.Spark31LegacyHoodieParquetFileFormat.{createParquetFilters,
 pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, 
PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -61,7 +61,7 @@ import java.net.URI
  *   <li>Schema on-read</li>
  * </ol>
  */
-class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: 
Boolean) extends ParquetFileFormat {
+class Spark31LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
 
   override def buildReaderWithPartitionValues(sparkSession: SparkSession,
                                               dataSchema: StructType,
@@ -343,7 +343,7 @@ class Spark31HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
   }
 }
 
-object Spark31HoodieParquetFileFormat {
+object Spark31LegacyHoodieParquetFileFormat {
 
   def pruneInternalSchema(internalSchemaStr: String, requiredSchema: 
StructType): String = {
     val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
diff --git 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
index f76128d4f81..d4624625d75 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
@@ -19,14 +19,18 @@
 package org.apache.spark.sql
 
 import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.SparkHoodieTableFileIndex
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt
 import org.apache.spark.sql.catalyst.analysis.ResolvedTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, ProjectionOverSchema}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable}
+import org.apache.spark.sql.catalyst.planning.ScanOperation
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable, Project}
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.execution.command.RepairTableCommand
+import 
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.types.StructType
 
 object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -45,6 +49,16 @@ object HoodieSpark32CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
     }
   }
 
+  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
+    plan match {
+      case s@ScanOperation(_, _,
+      l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if 
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && 
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
+        fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = 
true
+        
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, 
fs.sparkSession.sessionState.analyzer.resolver), s)
+      case _ => plan
+    }
+  }
+
   override def projectOverSchema(schema: StructType, output: AttributeSet): 
ProjectionOverSchema = {
     val klass = classOf[ProjectionOverSchema]
     checkArgument(klass.getConstructors.length == 1)
diff --git 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
index f07d0ccdc63..3a5812a5faa 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
@@ -30,9 +30,9 @@ import 
org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
 import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark32HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark32LegacyHoodieParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, 
HoodieSpark32PartitionedFileUtils, HoodieSparkPartitionedFileUtils, 
PartitionedFile}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
 import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, 
HoodieSpark3_2ExtendedSqlParser}
 import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, 
StructType}
@@ -84,8 +84,8 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
   override def createExtendedSparkParser(spark: SparkSession, delegate: 
ParserInterface): HoodieExtendedParserInterface =
     new HoodieSpark3_2ExtendedSqlParser(spark, delegate)
 
-  override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): 
Option[ParquetFileFormat] = {
-    Some(new Spark32HoodieParquetFileFormat(appendPartitionValues))
+  override def createLegacyHoodieParquetFileFormat(appendPartitionValues: 
Boolean): Option[ParquetFileFormat] = {
+    Some(new Spark32LegacyHoodieParquetFileFormat(appendPartitionValues))
   }
 
   override def createHoodieFileScanRDD(sparkSession: SparkSession,
diff --git 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala
similarity index 98%
rename from 
hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
rename to 
hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala
index f6eb5da13b5..c88c35b5eeb 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32LegacyHoodieParquetFileFormat.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import 
org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat._
+import 
org.apache.spark.sql.execution.datasources.parquet.Spark32LegacyHoodieParquetFileFormat._
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, 
PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -60,7 +60,7 @@ import java.net.URI
  * <li>Schema on-read</li>
  * </ol>
  */
-class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: 
Boolean) extends ParquetFileFormat {
+class Spark32LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
 
   override def buildReaderWithPartitionValues(sparkSession: SparkSession,
                                               dataSchema: StructType,
@@ -172,7 +172,7 @@ class Spark32HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
           //       and unfortunately won't compile against Spark 3.2.0
           //       However this code is runtime-compatible w/ both Spark 3.2.0 
and >= Spark 3.2.1
           val datetimeRebaseSpec =
-            
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, 
datetimeRebaseModeInRead)
+          
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, 
datetimeRebaseModeInRead)
           new ParquetFilters(
             parquetSchema,
             pushDownDate,
@@ -271,7 +271,7 @@ class Spark32HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
             //       and unfortunately won't compile against Spark 3.2.0
             //       However this code is runtime-compatible w/ both Spark 
3.2.0 and >= Spark 3.2.1
             val int96RebaseSpec =
-              
DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, 
int96RebaseModeInRead)
+            
DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, 
int96RebaseModeInRead)
             val datetimeRebaseSpec =
               
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, 
datetimeRebaseModeInRead)
             new VectorizedParquetRecordReader(
@@ -409,7 +409,7 @@ class Spark32HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
   }
 }
 
-object Spark32HoodieParquetFileFormat {
+object Spark32LegacyHoodieParquetFileFormat {
 
   /**
    * NOTE: This method is specific to Spark 3.2.0
@@ -514,4 +514,3 @@ object Spark32HoodieParquetFileFormat {
     }
   }
 }
-
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
index a294dcfdf4b..16f2517d128 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
@@ -18,13 +18,17 @@
 
 package org.apache.spark.sql
 
+import org.apache.hudi.SparkHoodieTableFileIndex
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt
 import org.apache.spark.sql.catalyst.analysis.ResolvedTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, ProjectionOverSchema}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable}
+import org.apache.spark.sql.catalyst.planning.ScanOperation
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable, Project}
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.execution.command.RepairTableCommand
+import 
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.types.StructType
 
 object HoodieSpark33CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -43,6 +47,16 @@ object HoodieSpark33CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
     }
   }
 
+  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
+    plan match {
+      case s@ScanOperation(_, _,
+      l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if 
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && 
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
+        fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = 
true
+        
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, 
fs.sparkSession.sessionState.analyzer.resolver), s)
+      case _ => plan
+    }
+  }
+
   override def projectOverSchema(schema: StructType, output: AttributeSet): 
ProjectionOverSchema =
     ProjectionOverSchema(schema, output)
 
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index 651027f932d..e3d2cc9cd18 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -30,9 +30,9 @@ import 
org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
 import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark33HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark33LegacyHoodieParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, 
HoodieSpark33PartitionedFileUtils, HoodieSparkPartitionedFileUtils, 
PartitionedFile}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
 import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, 
HoodieSpark3_3ExtendedSqlParser}
 import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, 
StructType}
@@ -85,8 +85,8 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
   override def createExtendedSparkParser(spark: SparkSession, delegate: 
ParserInterface): HoodieExtendedParserInterface =
     new HoodieSpark3_3ExtendedSqlParser(spark, delegate)
 
-  override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): 
Option[ParquetFileFormat] = {
-    Some(new Spark33HoodieParquetFileFormat(appendPartitionValues))
+  override def createLegacyHoodieParquetFileFormat(appendPartitionValues: 
Boolean): Option[ParquetFileFormat] = {
+    Some(new Spark33LegacyHoodieParquetFileFormat(appendPartitionValues))
   }
 
   override def createHoodieFileScanRDD(sparkSession: SparkSession,
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33HoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
similarity index 99%
rename from 
hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33HoodieParquetFileFormat.scala
rename to 
hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
index 1e60f2ae968..de6cbff90ca 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33HoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import 
org.apache.spark.sql.execution.datasources.parquet.Spark33HoodieParquetFileFormat._
+import 
org.apache.spark.sql.execution.datasources.parquet.Spark33LegacyHoodieParquetFileFormat._
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, 
PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -62,7 +62,7 @@ import java.net.URI
  * <li>Schema on-read</li>
  * </ol>
  */
-class Spark33HoodieParquetFileFormat(private val shouldAppendPartitionValues: 
Boolean) extends ParquetFileFormat {
+class Spark33LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
 
   override def buildReaderWithPartitionValues(sparkSession: SparkSession,
                                               dataSchema: StructType,
@@ -411,7 +411,7 @@ class Spark33HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
   }
 }
 
-object Spark33HoodieParquetFileFormat {
+object Spark33LegacyHoodieParquetFileFormat {
 
   /**
    * NOTE: This method is specific to Spark 3.2.0
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
index 302fba45590..947a73285f5 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
@@ -18,12 +18,16 @@
 
 package org.apache.spark.sql
 
+import org.apache.hudi.SparkHoodieTableFileIndex
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, ResolvedTable}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, ProjectionOverSchema}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable}
+import org.apache.spark.sql.catalyst.planning.ScanOperation
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable, Project}
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.execution.command.RepairTableCommand
+import 
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.types.StructType
 
 object HoodieSpark34CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -42,6 +46,16 @@ object HoodieSpark34CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
     }
   }
 
+  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
+    plan match {
+      case s@ScanOperation(_, _, _,
+      l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if 
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && 
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
+        fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = 
true
+        
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, 
fs.sparkSession.sessionState.analyzer.resolver), s)
+      case _ => plan
+    }
+  }
+
   override def projectOverSchema(schema: StructType, output: AttributeSet): 
ProjectionOverSchema =
     ProjectionOverSchema(schema, output)
 
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index 3bd8256c3aa..0ae5ef3dbf3 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -29,14 +29,14 @@ import 
org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
 import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark34HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark34LegacyHoodieParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, 
HoodieSpark34PartitionedFileUtils, HoodieSparkPartitionedFileUtils, 
PartitionedFile}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
 import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, 
HoodieSpark3_4ExtendedSqlParser}
 import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, 
StructType}
 import org.apache.spark.sql.vectorized.ColumnarBatchRow
-import org.apache.spark.sql.{HoodieCatalystExpressionUtils, 
HoodieCatalystPlansUtils, HoodieSchemaUtils, HoodieSpark34CatalogUtils, 
HoodieSpark34CatalystExpressionUtils, HoodieSpark34CatalystPlanUtils, 
HoodieSpark34SchemaUtils, HoodieSpark3CatalogUtils, SparkSession, 
SparkSessionExtensions}
+import org.apache.spark.sql._
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel._
 
@@ -85,8 +85,8 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
   override def createExtendedSparkParser(spark: SparkSession, delegate: 
ParserInterface): HoodieExtendedParserInterface =
     new HoodieSpark3_4ExtendedSqlParser(spark, delegate)
 
-  override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): 
Option[ParquetFileFormat] = {
-    Some(new Spark34HoodieParquetFileFormat(appendPartitionValues))
+  override def createLegacyHoodieParquetFileFormat(appendPartitionValues: 
Boolean): Option[ParquetFileFormat] = {
+    Some(new Spark34LegacyHoodieParquetFileFormat(appendPartitionValues))
   }
 
   override def createHoodieFileScanRDD(sparkSession: SparkSession,
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34HoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
similarity index 99%
rename from 
hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34HoodieParquetFileFormat.scala
rename to 
hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
index 4bd3e2fc345..6de8ded06ec 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34HoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala
@@ -41,7 +41,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
 import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.WholeStageCodegenExec
-import 
org.apache.spark.sql.execution.datasources.parquet.Spark34HoodieParquetFileFormat._
+import 
org.apache.spark.sql.execution.datasources.parquet.Spark34LegacyHoodieParquetFileFormat._
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, 
PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -57,7 +57,7 @@ import org.apache.spark.util.SerializableConfiguration
  *   <li>Schema on-read</li>
  * </ol>
  */
-class Spark34HoodieParquetFileFormat(private val shouldAppendPartitionValues: 
Boolean) extends ParquetFileFormat {
+class Spark34LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
 
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
     val conf = sparkSession.sessionState.conf
@@ -427,7 +427,7 @@ class Spark34HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
   }
 }
 
-object Spark34HoodieParquetFileFormat {
+object Spark34LegacyHoodieParquetFileFormat {
 
   /**
    * NOTE: This method is specific to Spark 3.2.0
diff --git a/rfc/README.md b/rfc/README.md
index 544e1c83de9..0c5475233de 100644
--- a/rfc/README.md
+++ b/rfc/README.md
@@ -107,4 +107,4 @@ The list of all RFCs can be found here.
 | 69         | [Hudi 1.x](./rfc-69/rfc-69.md)                                  
                                                                                
                                                                     | `UNDER 
REVIEW` |
 | 70         | [Hudi Reverse Streamer](./rfc/rfc-70/rfc-70.md)                 
                                                                                
                                                                     | `UNDER 
REVIEW` |
 | 71         | [Enhance OCC conflict detection](./rfc/rfc-71/rfc-71.md)        
                                                                                
                                                                     | `UNDER 
REVIEW` |
-| 72         | [Redesign Hudi-Spark Integration](./rfc/rfc-72/rfc-72.md)       
                                                                                
                                                                     | `IN 
PROGRESS`  |
\ No newline at end of file
+| 72         | [Redesign Hudi-Spark Integration](./rfc/rfc-72/rfc-72.md)       
                                                                                
                                                                     | 
`ONGOING`      |
\ No newline at end of file


Reply via email to