This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fdc42928c90 [HUDI-7215] Delete NewHoodieParquetFileFormat (#10304)
fdc42928c90 is described below

commit fdc42928c905032fda20a208d5d3a7cdb0e377f7
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Dec 13 16:30:03 2023 -0500

    [HUDI-7215] Delete NewHoodieParquetFileFormat (#10304)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   9 -
 .../main/scala/org/apache/hudi/DefaultSource.scala |  66 +---
 .../hudi/HoodieHadoopFsRelationFactory.scala       |  62 ++--
 .../apache/hudi/HoodieSparkFileFormatUtils.scala   | 233 -------------
 .../parquet/NewHoodieParquetFileFormat.scala       | 378 ---------------------
 .../hudi/TestHoodieMergeHandleWithSparkMerger.java |   3 -
 .../apache/hudi/functional/TestBootstrapRead.java  |   6 +-
 .../hudi/functional/TestBootstrapReadBase.java     |  12 -
 .../functional/TestNewHoodieParquetFileFormat.java |   5 -
 .../hudi/TestAvroSchemaResolutionSupport.scala     |   5 -
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |   2 -
 .../hudi/functional/TestMORDataSourceStorage.scala |   1 -
 .../functional/TestPartialUpdateAvroPayload.scala  |   3 +-
 .../sql/hudi/TestPartialUpdateForMergeInto.scala   |   5 -
 .../hudi/utilities/sources/HoodieIncrSource.java   |   4 +-
 ...TestHoodieDeltaStreamerSchemaEvolutionBase.java |   3 -
 16 files changed, 40 insertions(+), 757 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index ac60606a4bc..d618a604388 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -86,15 +86,6 @@ object DataSourceReadOptions {
       s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL}) 
or skip merging altogether" +
       s"${REALTIME_SKIP_MERGE_OPT_VAL}")
 
-  val USE_NEW_HUDI_PARQUET_FILE_FORMAT: ConfigProperty[String] = ConfigProperty
-    .key("hoodie.datasource.read.use.new.parquet.file.format")
-    .defaultValue("true")
-    .markAdvanced()
-    .sinceVersion("0.14.0")
-    .withDocumentation("Read using the new Hudi parquet file format. The new 
Hudi parquet file format is " +
-      "introduced as an experimental feature in 0.14.0. Currently, the new 
Hudi parquet file format only applies " +
-      "to bootstrap and MOR queries. Schema evolution is also not supported by 
the new file format.")
-
   val READ_PATHS: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.read.paths")
     .noDefaultValue()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index ac8286b1bde..222a447f9f5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, 
OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
 import org.apache.hudi.cdc.CDCRelation
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieReaderConfig}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
 import org.apache.hudi.common.model.{HoodieTableType, WriteConcurrencyMode}
@@ -235,15 +236,11 @@ object DefaultSource {
       Option(schema)
     }
 
-    val useNewParquetFileFormat =
-      parameters.getOrElse(
-        USE_NEW_HUDI_PARQUET_FILE_FORMAT.key,
-        USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean &&
-      !metaClient.isMetadataTable &&
-      (globPaths == null || globPaths.isEmpty) &&
-      parameters.getOrElse(REALTIME_MERGE.key(), REALTIME_MERGE.defaultValue())
-        .equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL)
-
+    val useNewParquetFileFormat = 
parameters.getOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
+      
HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue().toString).toBoolean 
&&
+      !metaClient.isMetadataTable && (globPaths == null || globPaths.isEmpty) 
&&
+      !parameters.getOrDefault(SCHEMA_EVOLUTION_ENABLED.key(), 
SCHEMA_EVOLUTION_ENABLED.defaultValue().toString).toBoolean &&
+      parameters.getOrElse(REALTIME_MERGE.key(), 
REALTIME_MERGE.defaultValue()).equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL)
     if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() 
== 0) {
       new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters, 
Some(schema)))
     } else if (isCdcQuery) {
@@ -259,57 +256,35 @@ object DefaultSource {
         CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
       }
     } else {
-      lazy val fileFormatUtils = if ((isMultipleBaseFileFormatsEnabled && 
!isBootstrappedTable)
-        || (useNewParquetFileFormat)) {
-        val formatUtils = new HoodieSparkFileFormatUtils(sqlContext, 
metaClient, parameters, userSchema)
-        if (formatUtils.hasSchemaOnRead) Option.empty else Some(formatUtils)
-      } else {
-        Option.empty
-      }
-
-      if (isMultipleBaseFileFormatsEnabled) {
-        if (isBootstrappedTable) {
-          throw new HoodieException(s"Multiple base file formats are not 
supported for bootstrapped table")
-        }
-        resolveMultiFileFormatRelation(tableType, queryType, 
fileFormatUtils.get)
-      }
 
       (tableType, queryType, isBootstrappedTable) match {
         case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
              (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
              (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
-          if (fileFormatUtils.isDefined) {
+          if (useNewParquetFileFormat) {
             new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
               sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
           } else {
             resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
           }
         case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-          if (fileFormatUtils.isDefined) {
+          if (useNewParquetFileFormat) {
             new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
               sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
           } else {
             new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
           }
 
-        case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
-          if (fileFormatUtils.isDefined) {
+        case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, _) =>
+          if (useNewParquetFileFormat) {
             new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
-              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
+              sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
           } else {
             new MergeOnReadSnapshotRelation(sqlContext, parameters, 
metaClient, globPaths, userSchema)
           }
 
-        case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
-          if (fileFormatUtils.isDefined) {
-            new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
-              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
true).build()
-          } else {
-            HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, 
metaClient, parameters)
-          }
-
         case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-          if (fileFormatUtils.isDefined) {
+          if (useNewParquetFileFormat) {
             new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
               sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
           } else {
@@ -317,7 +292,7 @@ object DefaultSource {
           }
 
         case (_, _, true) =>
-          if (fileFormatUtils.isDefined) {
+          if (useNewParquetFileFormat) {
             new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
               sqlContext, metaClient, parameters, userSchema, isBootstrap = 
true).build()
           } else {
@@ -368,21 +343,6 @@ object DefaultSource {
     }
   }
 
-  private def resolveMultiFileFormatRelation(tableType: HoodieTableType,
-                                             queryType: String,
-                                             fileFormatUtils: 
HoodieSparkFileFormatUtils): BaseRelation = {
-    (tableType, queryType) match {
-      case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL) |
-           (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) =>
-        fileFormatUtils.getHadoopFsRelation(isMOR = false, isBootstrap = false)
-      case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) |
-           (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) =>
-        fileFormatUtils.getHadoopFsRelation(isMOR = true, isBootstrap = false)
-      case (_, _) =>
-        throw new HoodieException(s"Multiple base file formats not supported 
for query type : $queryType for tableType: $tableType")
-    }
-  }
-
   private def resolveSchema(metaClient: HoodieTableMetaClient,
                             parameters: Map[String, String],
                             schema: Option[StructType]): StructType = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index aee15189545..34088f2e699 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -34,16 +34,16 @@ import org.apache.hudi.common.util.{ConfigUtils, 
StringUtils}
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
+import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
 import org.apache.hudi.metadata.HoodieTableMetadataUtil
 import org.apache.spark.sql.catalyst.analysis.Resolver
-import org.apache.spark.sql.{SQLContext, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieFileGroupReaderBasedParquetFileFormat,
 NewHoodieParquetFileFormat, ParquetFileFormat}
+import 
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat
 import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, 
FileStatusCache, HadoopFsRelation, HoodieMultipleBaseFileFormat}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{SQLContext, SparkSession}
 
 import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
@@ -169,8 +169,6 @@ abstract class HoodieBaseHadoopFsRelationFactory(val 
sqlContext: SQLContext,
   protected lazy val mandatoryFieldsForMerging: Seq[String] =
     Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
 
-  protected lazy val fileGroupReaderEnabled: Boolean = 
checkIfAConfigurationEnabled(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
-
   protected lazy val shouldUseRecordPosition: Boolean = 
checkIfAConfigurationEnabled(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS)
 
   protected def queryTimestamp: Option[String] =
@@ -234,19 +232,15 @@ class 
HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext:
   override def buildFileIndex(): FileIndex = fileIndex
 
   override def buildFileFormat(): FileFormat = {
-    if (fileGroupReaderEnabled) {
-      new HoodieFileGroupReaderBasedParquetFileFormat(
-        tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
-        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-        true, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
-    } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
+    if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
       new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
         
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
         metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
true, false, Seq.empty)
     } else {
-      new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
true, isBootstrap, false, Seq.empty)
+      new HoodieFileGroupReaderBasedParquetFileFormat(
+        tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+        true, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
     }
   }
 
@@ -281,21 +275,16 @@ class 
HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val sqlContex
     sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), true, true)
 
   override def buildFileFormat(): FileFormat = {
-    if (fileGroupReaderEnabled) {
-      new HoodieFileGroupReaderBasedParquetFileFormat(
-        tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
-        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-        true, isBootstrap, true, shouldUseRecordPosition, 
fileIndex.getRequiredFilters)
-    } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
+    if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
       new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
         
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
         metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
         true, true, fileIndex.getRequiredFilters)
     } else {
-      new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+      new HoodieFileGroupReaderBasedParquetFileFormat(
+        tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
         metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-        true, isBootstrap, true, fileIndex.getRequiredFilters)
+        true, isBootstrap, true, shouldUseRecordPosition, 
fileIndex.getRequiredFilters)
     }
   }
 }
@@ -318,19 +307,15 @@ class 
HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val sqlContext:
     shouldEmbedFileSlices = true)
 
   override def buildFileFormat(): FileFormat = {
-    if (fileGroupReaderEnabled) {
-      new HoodieFileGroupReaderBasedParquetFileFormat(
-        tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
-        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-        false, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
-    } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
+    if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
       new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
         
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
         metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
false, false, Seq.empty)
     } else {
-      new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
false, isBootstrap, false, Seq.empty)
+      new HoodieFileGroupReaderBasedParquetFileFormat(
+        tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
+        false, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
     }
   }
 }
@@ -349,21 +334,16 @@ class 
HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val sqlContex
     sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), false, isBootstrap)
 
   override def buildFileFormat(): FileFormat = {
-    if (fileGroupReaderEnabled) {
-      new HoodieFileGroupReaderBasedParquetFileFormat(
-        tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
-        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-        false, isBootstrap, true, shouldUseRecordPosition, 
fileIndex.getRequiredFilters)
-    } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
+    if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
       new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
         
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
         metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
         false, true, fileIndex.getRequiredFilters)
     } else {
-      new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+      new HoodieFileGroupReaderBasedParquetFileFormat(
+        tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
         metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-        false, isBootstrap, true, fileIndex.getRequiredFilters)
+        false, isBootstrap, true, shouldUseRecordPosition, 
fileIndex.getRequiredFilters)
     }
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
deleted file mode 100644
index fe1645d3b60..00000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.avro.Schema
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hudi.HoodieBaseRelation._
-import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.common.config.{ConfigProperty, HoodieReaderConfig}
-import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.common.table.timeline.HoodieTimeline
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
-import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
-import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
-import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
-import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
-import org.apache.spark.sql.catalyst.analysis.Resolver
-import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieFileGroupReaderBasedParquetFileFormat,
 NewHoodieParquetFileFormat}
-import org.apache.spark.sql.execution.datasources.{FileStatusCache, 
HadoopFsRelation, HoodieMultipleBaseFileFormat}
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
-import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{SQLContext, SparkSession}
-
-import scala.collection.JavaConverters._
-import scala.util.{Failure, Success, Try}
-
-class HoodieSparkFileFormatUtils(val sqlContext: SQLContext,
-                                 val metaClient: HoodieTableMetaClient,
-                                 val optParamsInput: Map[String, String],
-                                 private val schemaSpec: Option[StructType]) 
extends SparkAdapterSupport {
-  protected val sparkSession: SparkSession = sqlContext.sparkSession
-
-  protected val optParams: Map[String, String] = optParamsInput.filter(kv => 
!kv._1.equals(DATA_QUERIES_ONLY.key()))
-  protected def tableName: String = metaClient.getTableConfig.getTableName
-
-  protected lazy val resolver: Resolver = 
sparkSession.sessionState.analyzer.resolver
-
-  private lazy val metaFieldNames = 
HoodieRecord.HOODIE_META_COLUMNS.asScala.toSet
-
-  protected lazy val conf: Configuration = new 
Configuration(sqlContext.sparkContext.hadoopConfiguration)
-  protected lazy val jobConf = new JobConf(conf)
-
-  protected lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig
-
-  protected lazy val basePath: Path = metaClient.getBasePathV2
-
-  protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: 
Option[InternalSchema]) = {
-    val schemaResolver = new TableSchemaResolver(metaClient)
-    val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(optParams, 
sparkSession)) {
-      None
-    } else {
-      Try {
-        
specifiedQueryTimestamp.map(schemaResolver.getTableInternalSchemaFromCommitMetadata)
-          .getOrElse(schemaResolver.getTableInternalSchemaFromCommitMetadata)
-      } match {
-        case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt)
-        case Failure(e) =>
-          None
-      }
-    }
-
-    val (name, namespace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
-    val avroSchema = internalSchemaOpt.map { is =>
-      AvroInternalSchemaConverter.convert(is, namespace + "." + name)
-    } orElse {
-      specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
-    } orElse {
-      schemaSpec.map(s => convertToAvroSchema(s, tableName))
-    } getOrElse {
-      Try(schemaResolver.getTableAvroSchema) match {
-        case Success(schema) => schema
-        case Failure(e) =>
-          throw new HoodieSchemaException("Failed to fetch schema from the 
table")
-      }
-    }
-
-    (avroSchema, internalSchemaOpt)
-  }
-
-  protected lazy val tableStructSchema: StructType = {
-    val converted = 
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
-    val metaFieldMetadata = sparkAdapter.createCatalystMetadataForMetaField
-
-    // NOTE: Here we annotate meta-fields with corresponding metadata such 
that Spark (>= 3.2)
-    //       is able to recognize such fields as meta-fields
-    StructType(converted.map { field =>
-      if (metaFieldNames.exists(metaFieldName => resolver(metaFieldName, 
field.name))) {
-        field.copy(metadata = metaFieldMetadata)
-      } else {
-        field
-      }
-    })
-  }
-
-  protected lazy val preCombineFieldOpt: Option[String] =
-    Option(tableConfig.getPreCombineField)
-      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) 
match {
-      // NOTE: This is required to compensate for cases when empty string is 
used to stub
-      //       property value to avoid it being set with the default value
-      // TODO(HUDI-3456) cleanup
-      case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
-      case _ => None
-    }
-
-  protected def timeline: HoodieTimeline =
-  // NOTE: We're including compaction here since it's not considering a 
"commit" operation
-    metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants
-
-  protected lazy val recordKeyField: String =
-    if (tableConfig.populateMetaFields()) {
-      HoodieRecord.RECORD_KEY_METADATA_FIELD
-    } else {
-      val keyFields = tableConfig.getRecordKeyFields.get()
-      checkState(keyFields.length == 1)
-      keyFields.head
-    }
-
-  private def queryTimestamp: Option[String] =
-    
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp))
-
-  protected lazy val specifiedQueryTimestamp: Option[String] =
-    optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
-      .map(HoodieSqlCommonUtils.formatQueryInstant)
-
-  private def getConfigValue(config: ConfigProperty[String],
-                             defaultValueOption: Option[String] = 
Option.empty): String = {
-    optParams.getOrElse(config.key(),
-      sqlContext.getConf(config.key(), 
defaultValueOption.getOrElse(config.defaultValue())))
-  }
-
-  private def checkIfAConfigurationEnabled(config: 
ConfigProperty[java.lang.Boolean],
-                             defaultValueOption: Option[String] = 
Option.empty): Boolean = {
-    optParams.getOrElse(config.key(),
-      sqlContext.getConf(config.key(), 
defaultValueOption.getOrElse(String.valueOf(config.defaultValue())))).toBoolean
-  }
-
-  protected val mergeType: String = 
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
-    DataSourceReadOptions.REALTIME_MERGE.defaultValue)
-
-  protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
-    // Controls whether partition columns (which are the source for the 
partition path values) should
-    // be omitted from persistence in the data files. On the read path it 
affects whether partition values (values
-    // of partition columns) will be read from the data file or extracted from 
partition path
-    val shouldOmitPartitionColumns = 
metaClient.getTableConfig.shouldDropPartitionColumns && 
partitionColumns.nonEmpty
-    val shouldExtractPartitionValueFromPath =
-      
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
-        
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
-    val shouldUseBootstrapFastRead = 
optParams.getOrElse(DATA_QUERIES_ONLY.key(), "false").toBoolean
-
-    shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath || 
shouldUseBootstrapFastRead
-  }
-
-  protected lazy val mandatoryFieldsForMerging: Seq[String] =
-    Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
-
-  protected lazy val partitionColumns: Array[String] = 
tableConfig.getPartitionFields.orElse(Array.empty)
-
-  def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined
-
-  def getHadoopFsRelation(isMOR: Boolean, isBootstrap: Boolean): BaseRelation 
= {
-    val fileIndex = HoodieFileIndex(
-      sparkSession, metaClient, Some(tableStructSchema), optParams, 
FileStatusCache.getOrCreate(sparkSession), isMOR, shouldEmbedFileSlices = true)
-    val recordMergerImpls = 
ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList
-    val recordMergerStrategy = 
getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY,
-      Option(metaClient.getTableConfig.getRecordMergerStrategy))
-
-    val fileGroupReaderEnabled = 
checkIfAConfigurationEnabled(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
-    val shouldUseRecordPosition = 
checkIfAConfigurationEnabled(HoodieWriteConfig.WRITE_RECORD_POSITIONS)
-
-    val tableState = // Subset of the state of table's configuration as of at 
the time of the query
-      HoodieTableState(
-        tablePath = basePath.toString,
-        latestCommitTimestamp = queryTimestamp,
-        recordKeyField = recordKeyField,
-        preCombineFieldOpt = preCombineFieldOpt,
-        usesVirtualKeys = !tableConfig.populateMetaFields(),
-        recordPayloadClassName = tableConfig.getPayloadClass,
-        metadataConfig = fileIndex.metadataConfig,
-        recordMergerImpls = recordMergerImpls,
-        recordMergerStrategy = recordMergerStrategy
-      )
-
-    val mandatoryFields = if (isMOR) {
-      mandatoryFieldsForMerging
-    } else {
-      Seq.empty
-    }
-
-    val fileFormat = if (fileGroupReaderEnabled) {
-      new HoodieFileGroupReaderBasedParquetFileFormat(
-        tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
-        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-        isMOR, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
-    } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
-      new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
-        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
isMOR, false, Seq.empty)
-    } else {
-      new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
-        isMOR, isBootstrap, false, Seq.empty)
-    }
-
-    HadoopFsRelation(
-      location = fileIndex,
-      partitionSchema = fileIndex.partitionSchema,
-      dataSchema = fileIndex.dataSchema,
-      bucketSpec = None,
-      fileFormat = fileFormat,
-      optParams)(sparkSession)
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
deleted file mode 100644
index 0751d99f79c..00000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
+++ /dev/null
@@ -1,378 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import 
org.apache.hudi.DataSourceReadOptions.{REALTIME_PAYLOAD_COMBINE_OPT_VAL, 
REALTIME_SKIP_MERGE_OPT_VAL}
-import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
-import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{BaseFile, FileSlice, HoodieLogFile, 
HoodieRecord}
-import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.{HoodieBaseRelation, HoodiePartitionFileSliceMapping, 
HoodieSparkUtils, HoodieTableSchema, HoodieTableState, LogFileIterator, 
MergeOnReadSnapshotRelation, RecordMergingFileIterator, SkipMergeIterator, 
SparkAdapterSupport}
-import org.apache.spark.broadcast.Broadcast
-import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
-import org.apache.spark.util.SerializableConfiguration
-
-import scala.collection.mutable
-import scala.jdk.CollectionConverters.asScalaIteratorConverter
-
-/**
- * This class does bootstrap and MOR merging so that we can use hadoopfs 
relation.
- */
-class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState],
-                                 tableSchema: Broadcast[HoodieTableSchema],
-                                 tableName: String,
-                                 mergeType: String,
-                                 mandatoryFields: Seq[String],
-                                 isMOR: Boolean,
-                                 isBootstrap: Boolean,
-                                 isIncremental: Boolean,
-                                 requiredFilters: Seq[Filter]
-                                ) extends ParquetFileFormat with 
SparkAdapterSupport with HoodieFormatTrait {
-
-  def getRequiredFilters: Seq[Filter] = requiredFilters
-
-  override def isSplitable(sparkSession: SparkSession,
-                           options: Map[String, String],
-                           path: Path): Boolean = {
-    false
-  }
-
-  /**
-   * Support batch needs to remain consistent, even if one side of a bootstrap 
merge can support
-   * while the other side can't
-   */
-  private var supportBatchCalled = false
-  private var supportBatchResult = false
-  override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
-    if (!supportBatchCalled || supportBatchResult) {
-      supportBatchCalled = true
-      supportBatchResult = !isIncremental && !isMOR && 
super.supportBatch(sparkSession, schema)
-    }
-    supportBatchResult
-  }
-
-  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
-                                              dataSchema: StructType,
-                                              partitionSchema: StructType,
-                                              requiredSchema: StructType,
-                                              filters: Seq[Filter],
-                                              options: Map[String, String],
-                                              hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
-
-    val outputSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
-
-    val requiredSchemaWithMandatory = if (!isMOR || 
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
-      //add mandatory fields to required schema
-      val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
-      for (field <- mandatoryFields) {
-        if (requiredSchema.getFieldIndex(field).isEmpty) {
-          val fieldToAdd = 
dataSchema.fields(dataSchema.getFieldIndex(field).get)
-          added.append(fieldToAdd)
-        }
-      }
-      val addedFields = StructType(added.toArray)
-      StructType(requiredSchema.toArray ++ addedFields.fields)
-    } else {
-      dataSchema
-    }
-
-    val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f 
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
-    val requiredMeta = StructType(requiredSchemaSplits._1)
-    val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
-    val needMetaCols = requiredMeta.nonEmpty
-    val needDataCols = requiredWithoutMeta.nonEmpty
-    // note: this is only the output of the bootstrap merge if isMOR. If it is 
only bootstrap then the
-    // output will just be outputSchema
-    val bootstrapReaderOutput = StructType(requiredMeta.fields ++ 
requiredWithoutMeta.fields)
-
-    val skeletonReaderAppend = needMetaCols && isBootstrap && !(needDataCols 
|| isMOR) && partitionSchema.nonEmpty
-    val bootstrapBaseAppend = needDataCols && isBootstrap && !isMOR && 
partitionSchema.nonEmpty
-
-    val (baseFileReader, preMergeBaseFileReader, skeletonReader, 
bootstrapBaseReader) = buildFileReaders(sparkSession,
-      dataSchema, partitionSchema, if (isIncremental) 
requiredSchemaWithMandatory else requiredSchema,
-      filters, options, hadoopConf, requiredSchemaWithMandatory,
-      requiredWithoutMeta, requiredMeta)
-
-    val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
-    (file: PartitionedFile) => {
-      file.partitionValues match {
-        case fileSliceMapping: HoodiePartitionFileSliceMapping =>
-          val filePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
-          if (FSUtils.isLogFile(filePath)) {
-            //no base file
-            val fileSlice = 
fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
-            val logFiles = getLogFilesFromSlice(fileSlice)
-            val outputAvroSchema = 
HoodieBaseRelation.convertToAvroSchema(outputSchema, tableName)
-            new LogFileIterator(logFiles, filePath.getParent, 
tableSchema.value, outputSchema, outputAvroSchema,
-              tableState.value, broadcastedHadoopConf.value.value)
-          } else {
-            //We do not broadcast the slice if it has no log files or 
bootstrap base
-            fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName)) 
match {
-              case Some(fileSlice) =>
-                val hoodieBaseFile = fileSlice.getBaseFile.get()
-                val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
-                val partitionValues = fileSliceMapping.getPartitionValues
-                val logFiles = if (isMOR) getLogFilesFromSlice(fileSlice) else 
List.empty
-                if (requiredSchemaWithMandatory.isEmpty) {
-                  val baseFile = createPartitionedFile(partitionValues, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
-                  baseFileReader(baseFile)
-                } else if (bootstrapFileOpt.isPresent) {
-                  val bootstrapIterator = 
buildBootstrapIterator(skeletonReader, bootstrapBaseReader,
-                    skeletonReaderAppend, bootstrapBaseAppend, 
bootstrapFileOpt.get(), hoodieBaseFile, partitionValues,
-                    needMetaCols, needDataCols)
-                  (isMOR, logFiles.nonEmpty) match {
-                    case (true, true) => 
buildMergeOnReadIterator(bootstrapIterator, logFiles, filePath.getParent,
-                      bootstrapReaderOutput, requiredSchemaWithMandatory, 
outputSchema, partitionSchema, partitionValues,
-                      broadcastedHadoopConf.value.value)
-                    case (true, false) => 
appendPartitionAndProject(bootstrapIterator, bootstrapReaderOutput,
-                      partitionSchema, outputSchema, partitionValues)
-                    case (false, false) => bootstrapIterator
-                    case (false, true) => throw new 
IllegalStateException("should not be log files if not mor table")
-                  }
-                } else {
-                  if (logFiles.nonEmpty) {
-                    val baseFile = createPartitionedFile(InternalRow.empty, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
-                    buildMergeOnReadIterator(preMergeBaseFileReader(baseFile), 
logFiles, filePath.getParent, requiredSchemaWithMandatory,
-                      requiredSchemaWithMandatory, outputSchema, 
partitionSchema, partitionValues, broadcastedHadoopConf.value.value)
-                  } else {
-                    throw new IllegalStateException("should not be here since 
file slice should not have been broadcasted since it has no log or data files")
-                    //baseFileReader(baseFile)
-                  }
-                }
-              case _ if isIncremental =>
-                projectSchema(baseFileReader(file), 
StructType(requiredSchemaWithMandatory.fields ++ partitionSchema.fields), 
outputSchema)
-              case _ => baseFileReader(file)
-            }
-          }
-        case _ if isIncremental =>
-          projectSchema(baseFileReader(file), 
StructType(requiredSchemaWithMandatory.fields ++ partitionSchema.fields), 
outputSchema)
-        case _ => baseFileReader(file)
-      }
-    }
-  }
-
-  /**
-   * Build file readers to read individual physical files
-   */
- protected def buildFileReaders(sparkSession: SparkSession, dataSchema: 
StructType, partitionSchema: StructType,
-                       requiredSchema: StructType, filters: Seq[Filter], 
options: Map[String, String],
-                       hadoopConf: Configuration, requiredSchemaWithMandatory: 
StructType,
-                       requiredWithoutMeta: StructType, requiredMeta: 
StructType):
-  (PartitionedFile => Iterator[InternalRow],
-    PartitionedFile => Iterator[InternalRow],
-    PartitionedFile => Iterator[InternalRow],
-    PartitionedFile => Iterator[InternalRow]) = {
-
-    //file reader when you just read a hudi parquet file and don't do any 
merging
-    val baseFileReader = if (isIncremental) {
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
partitionSchema, requiredSchemaWithMandatory,
-        filters ++ requiredFilters, options, new Configuration(hadoopConf))
-    } else {
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
partitionSchema, requiredSchema,
-        filters ++ requiredFilters, options, new Configuration(hadoopConf))
-    }
-
-    //file reader for reading a hudi base file that needs to be merged with 
log files
-   val recordKeyRelatedFilters = getRecordKeyRelatedFilters(filters, 
tableState.value.recordKeyField)
-    val preMergeBaseFileReader = if (isMOR) {
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
StructType(Seq.empty),
-        requiredSchemaWithMandatory, requiredFilters ++ 
recordKeyRelatedFilters, options, new Configuration(hadoopConf))
-    } else {
-      _: PartitionedFile => Iterator.empty
-    }
-
-    //Rules for appending partitions and filtering in the bootstrap readers:
-    // 1. if it is mor, we don't want to filter data or append partitions
-    // 2. if we need to merge the bootstrap base and skeleton files then we 
cannot filter
-    // 3. if we need to merge the bootstrap base and skeleton files then we 
should never append partitions to the
-    //    skeleton reader
-
-    val needMetaCols = requiredMeta.nonEmpty
-    val needDataCols = requiredWithoutMeta.nonEmpty
-
-    //file reader for bootstrap skeleton files
-    val skeletonReader = if (needMetaCols && isBootstrap) {
-      if (needDataCols || isMOR) {
-        // no filter and no append
-        super.buildReaderWithPartitionValues(sparkSession, 
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
-          requiredMeta, Seq.empty, options, new Configuration(hadoopConf))
-      } else {
-        // filter and append
-        super.buildReaderWithPartitionValues(sparkSession, 
HoodieSparkUtils.getMetaSchema, partitionSchema,
-          requiredMeta, filters, options, new Configuration(hadoopConf))
-      }
-    } else {
-      _: PartitionedFile => Iterator.empty
-    }
-
-    //file reader for bootstrap base files
-    val bootstrapBaseReader = if (needDataCols && isBootstrap) {
-      val dataSchemaWithoutMeta = StructType(dataSchema.fields.filterNot(sf => 
isMetaField(sf.name)))
-      if (isMOR) {
-        // no filter and no append
-        super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
-          Seq.empty, options, new Configuration(hadoopConf))
-      } else if (needMetaCols) {
-        // no filter but append
-        super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
-          Seq.empty, options, new Configuration(hadoopConf))
-      } else {
-        // filter and append
-        super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
-          filters ++ requiredFilters, options, new Configuration(hadoopConf))
-      }
-    } else {
-      _: PartitionedFile => Iterator.empty
-    }
-
-    (baseFileReader, preMergeBaseFileReader, skeletonReader, 
bootstrapBaseReader)
-  }
-
-  /**
-   * Create iterator for a file slice that has bootstrap base and skeleton file
-   */
-  protected def buildBootstrapIterator(skeletonReader: PartitionedFile => 
Iterator[InternalRow],
-                             bootstrapBaseReader: PartitionedFile => 
Iterator[InternalRow],
-                             skeletonReaderAppend: Boolean, 
bootstrapBaseAppend: Boolean,
-                             bootstrapBaseFile: BaseFile, hoodieBaseFile: 
BaseFile,
-                             partitionValues: InternalRow, needMetaCols: 
Boolean,
-                             needDataCols: Boolean): Iterator[InternalRow] = {
-    lazy val skeletonFile = if (skeletonReaderAppend) {
-      createPartitionedFile(partitionValues, hoodieBaseFile.getHadoopPath, 0, 
hoodieBaseFile.getFileLen)
-    } else {
-      createPartitionedFile(InternalRow.empty, hoodieBaseFile.getHadoopPath, 
0, hoodieBaseFile.getFileLen)
-    }
-
-    lazy val dataFile = if (bootstrapBaseAppend) {
-      createPartitionedFile(partitionValues, bootstrapBaseFile.getHadoopPath, 
0, bootstrapBaseFile.getFileLen)
-    } else {
-      createPartitionedFile(InternalRow.empty, 
bootstrapBaseFile.getHadoopPath, 0, bootstrapBaseFile.getFileLen)
-    }
-
-    lazy val skeletonIterator = skeletonReader(skeletonFile)
-    lazy val dataFileIterator = bootstrapBaseReader(dataFile)
-
-    (needMetaCols, needDataCols) match {
-      case (true, true) => doBootstrapMerge(skeletonIterator, dataFileIterator)
-      case (true, false) => skeletonIterator
-      case (false, true) => dataFileIterator
-      case (false, false) => throw new IllegalStateException("should not be 
here if only partition cols are required")
-    }
-  }
-
-  /**
-   * Merge skeleton and data file iterators
-   */
-  protected def doBootstrapMerge(skeletonFileIterator: Iterator[Any], 
dataFileIterator: Iterator[Any]): Iterator[InternalRow] = {
-    new Iterator[Any] {
-      val combinedRow = new JoinedRow()
-
-      override def hasNext: Boolean = {
-        checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
-          "Bootstrap data-file iterator and skeleton-file iterator have to be 
in-sync!")
-        dataFileIterator.hasNext && skeletonFileIterator.hasNext
-      }
-
-      override def next(): Any = {
-        (skeletonFileIterator.next(), dataFileIterator.next()) match {
-          case (s: ColumnarBatch, d: ColumnarBatch) =>
-            val numCols = s.numCols() + d.numCols()
-            val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
-            for (i <- 0 until numCols) {
-              if (i < s.numCols()) {
-                vecs(i) = s.column(i)
-              } else {
-                vecs(i) = d.column(i - s.numCols())
-              }
-            }
-            assert(s.numRows() == d.numRows())
-            sparkAdapter.makeColumnarBatch(vecs, s.numRows())
-          case (_: ColumnarBatch, _: InternalRow) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
-          case (_: InternalRow, _: ColumnarBatch) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
-          case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
-        }
-      }
-    }.asInstanceOf[Iterator[InternalRow]]
-  }
-
-  /**
-   * Create iterator for a file slice that has log files
-   */
-  protected def buildMergeOnReadIterator(iter: Iterator[InternalRow], 
logFiles: List[HoodieLogFile],
-                               partitionPath: Path, inputSchema: StructType, 
requiredSchemaWithMandatory: StructType,
-                               outputSchema: StructType, partitionSchema: 
StructType, partitionValues: InternalRow,
-                               hadoopConf: Configuration): 
Iterator[InternalRow] = {
-
-    val requiredAvroSchema = 
HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, tableName)
-    val morIterator =  mergeType match {
-      case REALTIME_SKIP_MERGE_OPT_VAL => throw new 
UnsupportedOperationException("Skip merge is not currently " +
-        "implemented for the New Hudi Parquet File format")
-        //new SkipMergeIterator(logFiles, partitionPath, iter, inputSchema, 
tableSchema.value,
-        //  requiredSchemaWithMandatory, requiredAvroSchema, tableState.value, 
hadoopConf)
-      case REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
-        new RecordMergingFileIterator(logFiles, partitionPath, iter, 
inputSchema, tableSchema.value,
-          requiredSchemaWithMandatory, requiredAvroSchema, tableState.value, 
hadoopConf)
-    }
-    appendPartitionAndProject(morIterator, requiredSchemaWithMandatory, 
partitionSchema,
-      outputSchema, partitionValues)
-  }
-
-  /**
-   * Append partition values to rows and project to output schema
-   */
-  protected def appendPartitionAndProject(iter: Iterator[InternalRow],
-                                inputSchema: StructType,
-                                partitionSchema: StructType,
-                                to: StructType,
-                                partitionValues: InternalRow): 
Iterator[InternalRow] = {
-    if (partitionSchema.isEmpty) {
-      projectSchema(iter, inputSchema, to)
-    } else {
-      val unsafeProjection = 
generateUnsafeProjection(StructType(inputSchema.fields ++ 
partitionSchema.fields), to)
-      val joinedRow = new JoinedRow()
-      iter.map(d => unsafeProjection(joinedRow(d, partitionValues)))
-    }
-  }
-
-  protected def projectSchema(iter: Iterator[InternalRow],
-                    from: StructType,
-                    to: StructType): Iterator[InternalRow] = {
-    val unsafeProjection = generateUnsafeProjection(from, to)
-    iter.map(d => unsafeProjection(d))
-  }
-
-  protected def getLogFilesFromSlice(fileSlice: FileSlice): 
List[HoodieLogFile] = {
-    
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
-  }
-
-  protected def getRecordKeyRelatedFilters(filters: Seq[Filter], 
recordKeyColumn: String): Seq[Filter] = {
-    filters.filter(f => f.references.exists(c => 
c.equalsIgnoreCase(recordKeyColumn)))
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
index 9847918adc1..2eb38eedaeb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
@@ -240,9 +240,6 @@ public class TestHoodieMergeHandleWithSparkMerger extends 
SparkClientFunctionalT
     properties.put(
         FILE_GROUP_READER_ENABLED.key(),
         "true");
-    properties.put(
-        DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(),
-        "true");
     properties.put(
         WRITE_RECORD_POSITIONS.key(),
         "true");
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
index 301b651ea69..d926a3be5a4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java
@@ -20,11 +20,15 @@ package org.apache.hudi.functional;
 
 import org.apache.hudi.common.model.HoodieTableType;
 
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.util.Map;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
@@ -60,7 +64,6 @@ public class TestBootstrapRead extends TestBootstrapReadBase {
   @ParameterizedTest
   @MethodSource("testArgs")
   public void testBootstrapFunctional(String bootstrapType, Boolean 
dashPartitions, HoodieTableType tableType, Integer nPartitions) {
-    /*
     this.bootstrapType = bootstrapType;
     this.dashPartitions = dashPartitions;
     this.tableType = tableType;
@@ -86,6 +89,5 @@ public class TestBootstrapRead extends TestBootstrapReadBase {
     doInsert(options, "002");
     compareTables();
     verifyMetaColOnlyRead(2);
-     */
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
index d3246f7c4da..d4db2e3eb2f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java
@@ -49,11 +49,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT;
-import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
 import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
 import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
-import static org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Tag("functional")
@@ -185,21 +182,12 @@ public abstract class TestBootstrapReadBase extends 
HoodieSparkClientTestBase {
   protected void compareTables() {
     Dataset<Row> hudiDf = 
sparkSession.read().format("hudi").load(hudiBasePath);
     Dataset<Row> bootstrapDf = 
sparkSession.read().format("hudi").load(bootstrapTargetPath);
-    Dataset<Row> fastBootstrapDf = 
sparkSession.read().format("hudi").option(DATA_QUERIES_ONLY.key(), 
"true").load(bootstrapTargetPath);
-    boolean shouldTestFastBootstrap = tableType.equals(COPY_ON_WRITE) && 
!Boolean.parseBoolean(USE_NEW_HUDI_PARQUET_FILE_FORMAT().defaultValue());
     if (nPartitions == 0) {
       compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns));
-      if (shouldTestFastBootstrap) {
-        compareDf(fastBootstrapDf.drop("city_to_state"), 
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path"));
-      }
       return;
     }
     compareDf(hudiDf.drop(dropColumns).drop(partitionCols), 
bootstrapDf.drop(dropColumns).drop(partitionCols));
     compareDf(hudiDf.select("_row_key",partitionCols), 
bootstrapDf.select("_row_key",partitionCols));
-    if (shouldTestFastBootstrap) {
-      compareDf(fastBootstrapDf.drop("city_to_state").drop(partitionCols), 
bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path").drop(partitionCols));
-      compareDf(fastBootstrapDf.select("_row_key",partitionCols), 
bootstrapDf.select("_row_key",partitionCols));
-    }
   }
 
   protected void verifyMetaColOnlyRead(Integer iteration) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
index efc803666e5..ce462c93d1b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.functional;
 
-import org.apache.hudi.DataSourceReadOptions;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.model.HoodieTableType;
 
@@ -104,11 +103,9 @@ public class TestNewHoodieParquetFileFormat extends 
TestBootstrapReadBase {
 
   protected void testCount(String tableBasePath) {
     Dataset<Row> legacyDf = sparkSession.read().format("hudi")
-        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "false")
         .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
         .load(tableBasePath);
     Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
-        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "true")
         .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true")
         .load(tableBasePath);
     assertEquals(legacyDf.count(), fileFormatDf.count());
@@ -124,11 +121,9 @@ public class TestNewHoodieParquetFileFormat extends 
TestBootstrapReadBase {
 
   protected void runIndividualComparison(String tableBasePath, String 
firstColumn, String... columns) {
     Dataset<Row> legacyDf = sparkSession.read().format("hudi")
-        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "false")
         .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
         .load(tableBasePath);
     Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
-        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "true")
         .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true")
         .load(tableBasePath);
     if (firstColumn.isEmpty()) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
index 6bf7b7b28bb..6e22db914d3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -149,7 +149,6 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
             // NOTE: type promotion is not supported for the custom file 
format and the filegroup reader
             //       HUDI-7045 and PR#10007 in progress to fix the issue
             .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
"false")
-            
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(), "false")
             .load(tempRecordPath)
           readDf.printSchema()
           readDf.show(false)
@@ -392,7 +391,6 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
       // NOTE: long to int type change is not supported for the custom file 
format and the filegroup reader
       //       HUDI-7045 and PR#10007 in progress to fix the issue
       .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
-      .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(), 
"false")
       .load(tempRecordPath)
     readDf.printSchema()
     readDf.show(false)
@@ -489,7 +487,6 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
       // NOTE: type promotion is not supported for the custom file format and 
the filegroup reader
       //       HUDI-7045 and PR#10007 in progress to fix the issue
       .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
-      .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(), 
"false")
       .load(tempRecordPath)
     readDf.printSchema()
     readDf.show(false)
@@ -556,7 +553,6 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
       // NOTE: type promotion is not supported for the custom file format and 
the filegroup reader
       //       HUDI-7045 and PR#10007 in progress to fix the issue
       .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
-      .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(), 
"false")
       .load(tempRecordPath)
     readDf.printSchema()
     readDf.show(false)
@@ -827,7 +823,6 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
       // NOTE: type promotion is not supported for the custom file format and 
the filegroup reader
       //       HUDI-7045 and PR#10007 in progress to fix the issue
       .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
-      .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(), 
"false")
       .load(tempRecordPath)
     readDf.printSchema()
     readDf.show(false)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 5c1d04ee477..78b880be386 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -21,7 +21,6 @@ package org.apache.hudi.common.table.read
 
 import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
-import org.apache.hudi.DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT
 import 
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
@@ -111,7 +110,6 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
                                           schema: Schema,
                                           fileGroupId: String): Unit = {
     val expectedDf = spark.read.format("hudi")
-      .option(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(), "false")
       .option(FILE_GROUP_READER_ENABLED.key(), "false")
       .load(basePath)
       .where(col(HoodieRecord.FILENAME_METADATA_FIELD).contains(fileGroupId))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index acf406c9fbc..c82d9ebe635 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -71,7 +71,6 @@ class TestMORDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> 
preCombineField)
     }
     if (useFileGroupReader) {
-      options += (DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key() 
-> String.valueOf(useFileGroupReader))
       options += (HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> 
String.valueOf(useFileGroupReader))
     }
     val dataGen = new HoodieTestDataGenerator(0xDEEF)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
index 1e605b092bf..40871997b28 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
@@ -27,7 +27,7 @@ import org.apache.hudi.common.util
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.hudi.util.JFunction
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
QuickstartUtils}
+import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{lit, typedLit}
 import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
@@ -123,7 +123,6 @@ class TestPartialUpdateAvroPayload extends 
HoodieClientTestBase {
       .save(basePath)
 
     val finalDF = spark.read.format("hudi")
-      .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(), 
String.valueOf(useFileGroupReader))
       .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
String.valueOf(useFileGroupReader))
       .load(basePath)
     assertEquals(finalDF.select("rider").collectAsList().get(0).getString(0), 
upsert1DF.select("rider").collectAsList().get(0).getString(0))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
index 25aa955b811..0c1ca31479d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.hudi
 
 import org.apache.avro.Schema
-import org.apache.hudi.DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT
 import org.apache.hudi.DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES
 import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.avro.HoodieAvroUtils
@@ -75,7 +74,6 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
       spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
       spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
       spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
-      spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
 
       // Create a table with five data fields
       spark.sql(
@@ -124,7 +122,6 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
       spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
       spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
       spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
-      spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
       // Write inserts to log block
       spark.sql(s"set ${INDEX_TYPE.key} = INMEMORY")
 
@@ -183,7 +180,6 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
       spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
       spark.sql(s"set ${LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat")
       spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
-      spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
 
       // Create a table with five data fields
       spark.sql(
@@ -287,7 +283,6 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
       spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
       spark.sql(s"set ${LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat")
       spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
-      spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
 
       // Create a table with five data fields
       spark.sql(
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index f87e5c231bf..2b52bda04f2 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -62,9 +62,7 @@ public class HoodieIncrSource extends RowSource {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieIncrSource.class);
   public static final Set<String> HOODIE_INCR_SOURCE_READ_OPT_KEYS =
-      CollectionUtils.createImmutableSet(
-          "hoodie.datasource.read.use.new.parquet.file.format",
-          HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key());
+      
CollectionUtils.createImmutableSet(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key());
   private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
 
   public static class Config {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index 42c485ea2e2..6b364667054 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -20,7 +20,6 @@
 package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.DataSourceReadOptions;
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
@@ -96,7 +95,6 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
 
   protected Map<String, String> readOpts = new HashMap<String, String>() {
     {
-      put(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), 
"false");
       put(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false");
     }
   };
@@ -146,7 +144,6 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
 
   protected HoodieDeltaStreamer.Config getDeltaStreamerConfig(String[] 
transformerClasses, boolean nullForDeletedCols,
                                                               TypedProperties 
extraProps) throws IOException {
-    
extraProps.setProperty(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(),
 "false");
     extraProps.setProperty(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
"false");
     extraProps.setProperty("hoodie.datasource.write.table.type", tableType);
     extraProps.setProperty("hoodie.datasource.write.row.writer.enable", 
rowWriterEnable.toString());

Reply via email to