This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e44ac12356f9 refactor(spark): Rework tests that disable FileGroup 
Reader in Spark (#14061)
e44ac12356f9 is described below

commit e44ac12356f9a855d43b942829de0b2628ac083b
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Dec 2 19:14:56 2025 -0500

    refactor(spark): Rework tests that disable FileGroup Reader in Spark 
(#14061)
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: Y Ethan Guo <[email protected]>
    Co-authored-by: Timothy Brown <[email protected]>
---
 .../main/scala/org/apache/hudi/DefaultSource.scala |  54 ++-----
 .../functional/TestFiltersInFileGroupReader.java   |  14 +-
 .../functional/TestNewHoodieParquetFileFormat.java | 158 -------------------
 .../hudi/TestAvroSchemaResolutionSupport.scala     | 173 ++++++++-------------
 .../hudi/functional/TestCOWDataSourceStorage.scala |  25 ++-
 .../TestMetadataTableWithSparkDataSource.scala     |   2 +-
 .../functional/TestPartialUpdateAvroPayload.scala  |  18 +--
 .../TestSparkSqlWithTimestampKeyGenerator.scala    |  24 +--
 .../TestVectorizedReadWithSchemaEvolution.scala    |   3 +-
 .../sql/hudi/dml/insert/TestInsertTable4.scala     |  37 -----
 10 files changed, 108 insertions(+), 400 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 67f8c321b747..1520cd1bd34d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -294,9 +294,7 @@ object DefaultSource {
         Option(schema)
       }
 
-      lazy val enableFileGroupReader = SparkConfigUtils
-        .getStringWithAltKeys(parameters, 
HoodieReaderConfig.FILE_GROUP_READER_ENABLED).toBoolean &&
-        !metaClient.isMetadataTable
+      lazy val isNotMetadataTable = !metaClient.isMetadataTable
       lazy val tableVersion = if 
(SparkConfigUtils.containsConfigProperty(parameters, 
INCREMENTAL_READ_TABLE_VERSION)) {
         Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
       } else {
@@ -306,7 +304,7 @@ object DefaultSource {
       if 
(metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) {
         new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters, 
Some(schema)))
       } else if (isCdcQuery) {
-        if (enableFileGroupReader) {
+        if (isNotMetadataTable) {
           if (tableType == COPY_ON_WRITE) {
             new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
               sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
@@ -323,24 +321,23 @@ object DefaultSource {
           case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
                (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
                (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
-            if (enableFileGroupReader) {
+            if (isNotMetadataTable) {
               new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
                 sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
             } else {
               resolveBaseFileOnlyRelation(sqlContext, userSchema, metaClient, 
parameters)
             }
           case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-            (hoodieTableSupportsCompletionTime, enableFileGroupReader) match {
-              case (true, true) => new 
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2(
+            if (hoodieTableSupportsCompletionTime) {
+              new HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2(
                 sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
-              case (true, false) => new IncrementalRelationV2(sqlContext, 
parameters, userSchema, metaClient, RangeType.OPEN_CLOSED)
-              case (false, true) => new 
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV1(
+            } else {
+              new HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV1(
                 sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
-              case (false, false) => new IncrementalRelationV1(sqlContext, 
parameters, userSchema, metaClient)
             }
 
           case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
-            if (enableFileGroupReader) {
+            if (isNotMetadataTable) {
               new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
                 sqlContext, metaClient, parameters, userSchema, isBootstrap = 
false).build()
             } else {
@@ -348,7 +345,7 @@ object DefaultSource {
             }
 
           case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
-            if (enableFileGroupReader) {
+            if (isNotMetadataTable) {
               new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
                 sqlContext, metaClient, parameters, userSchema, isBootstrap = 
true).build()
             } else {
@@ -356,22 +353,17 @@ object DefaultSource {
             }
 
           case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-            (hoodieTableSupportsCompletionTime, enableFileGroupReader) match {
-              case (true, true) => new 
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV2(
+            if (hoodieTableSupportsCompletionTime) {
+              new HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV2(
                 sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
-              case (true, false) => 
MergeOnReadIncrementalRelationV2(sqlContext, parameters, metaClient, userSchema)
-              case (false, true) => new 
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV1(
+            } else {
+              new HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV1(
                 sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
-              case (false, false) => 
MergeOnReadIncrementalRelationV1(sqlContext, parameters, metaClient, userSchema)
             }
 
           case (_, _, true) =>
-            if (enableFileGroupReader) {
-              new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
-                sqlContext, metaClient, parameters, userSchema, isBootstrap = 
true).build()
-            } else {
-              resolveHoodieBootstrapRelation(sqlContext, userSchema, 
metaClient, parameters)
-            }
+            new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
+              sqlContext, metaClient, parameters, userSchema, isBootstrap = 
true).build()
 
           case (_, _, _) =>
             throw new HoodieException(s"Invalid query type : $queryType for 
tableType: $tableType," +
@@ -381,22 +373,6 @@ object DefaultSource {
     }
   }
 
-  private def resolveHoodieBootstrapRelation(sqlContext: SQLContext,
-                                             userSchema: Option[StructType],
-                                             metaClient: HoodieTableMetaClient,
-                                             parameters: Map[String, String]): 
BaseRelation = {
-    val enableFileIndex = HoodieSparkConfUtils.getConfigValue(parameters, 
sqlContext.sparkSession.sessionState.conf,
-      ENABLE_HOODIE_FILE_INDEX.key, 
ENABLE_HOODIE_FILE_INDEX.defaultValue.toString).toBoolean
-    val isSchemaEvolutionEnabledOnRead = 
HoodieSparkConfUtils.getConfigValue(parameters,
-      sqlContext.sparkSession.sessionState.conf, 
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
-      
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
-    if (!enableFileIndex || isSchemaEvolutionEnabledOnRead || 
!parameters.getOrElse(DATA_QUERIES_ONLY.key, 
DATA_QUERIES_ONLY.defaultValue).toBoolean) {
-      HoodieBootstrapRelation(sqlContext, userSchema, metaClient, parameters + 
(DATA_QUERIES_ONLY.key() -> "false"))
-    } else {
-      HoodieBootstrapRelation(sqlContext, userSchema, metaClient, 
parameters).toHadoopFsRelation
-    }
-  }
-
   private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
                                           userSchema: Option[StructType],
                                           metaClient: HoodieTableMetaClient,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java
index c1221c7b4b32..85bbad2e3233 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java
@@ -77,30 +77,28 @@ class TestFiltersInFileGroupReader extends 
TestBootstrapReadBase {
   }
 
   protected void runComparison(boolean mergeUseRecordPositions) {
-    compareDf(createDf(hudiBasePath, true, mergeUseRecordPositions), 
createDf(hudiBasePath, false, false));
-    compareDf(createDf(bootstrapTargetPath, true, mergeUseRecordPositions), 
createDf(bootstrapTargetPath, false, false));
-    compareDf(createDf2(hudiBasePath, true, mergeUseRecordPositions), 
createDf2(hudiBasePath, false, false));
-    compareDf(createDf2(bootstrapTargetPath, true, mergeUseRecordPositions), 
createDf2(bootstrapTargetPath, false, false));
+    compareDf(createDf(hudiBasePath, mergeUseRecordPositions), 
createDf(hudiBasePath, false));
+    compareDf(createDf(bootstrapTargetPath, mergeUseRecordPositions), 
createDf(bootstrapTargetPath, false));
+    compareDf(createDf2(hudiBasePath, mergeUseRecordPositions), 
createDf2(hudiBasePath, false));
+    compareDf(createDf2(bootstrapTargetPath, mergeUseRecordPositions), 
createDf2(bootstrapTargetPath, false));
   }
 
-  protected Dataset<Row> createDf(String tableBasePath, Boolean 
fgReaderEnabled, Boolean mergeUseRecordPositions) {
+  protected Dataset<Row> createDf(String tableBasePath, boolean 
mergeUseRecordPositions) {
     //The chances of a uuid containing 00 with the 8-4-4-4-12 format is around 
90%
     //for bootstrap, _hoodie_record_key is in the skeleton while begin_lat is 
in the data
     //We have a record key filter so that tests MORs filter pushdown with 
position based merging
     return sparkSession.read().format("hudi")
-        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
fgReaderEnabled)
         .option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
mergeUseRecordPositions)
         .load(tableBasePath)
         .drop("city_to_state")
         .where("begin_lat > 0.5 and _hoodie_record_key LIKE '%00%'");
   }
 
-  protected Dataset<Row> createDf2(String tableBasePath, Boolean 
fgReaderEnabled, Boolean mergeUseRecordPositions) {
+  protected Dataset<Row> createDf2(String tableBasePath, boolean 
mergeUseRecordPositions) {
     //The chances of a uuid containing 00 with the 8-4-4-4-12 format is around 
90%
     //for bootstrap, _hoodie_record_key is in the skeleton while begin_lat is 
in the data
     //We have a record key filter so that tests MORs filter pushdown with 
position based merging
     return sparkSession.read().format("hudi")
-        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
fgReaderEnabled)
         .option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
mergeUseRecordPositions)
         .load(tableBasePath)
         .drop("city_to_state")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
deleted file mode 100644
index f1297643c72f..000000000000
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.functional;
-
-import org.apache.hudi.DataSourceReadOptions;
-import org.apache.hudi.common.config.HoodieReaderConfig;
-import org.apache.hudi.common.model.HoodieTableType;
-
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SaveMode;
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Stream;
-
-import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
-import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-@Tag("functional-b")
-public class TestNewHoodieParquetFileFormat extends TestBootstrapReadBase {
-
-  private static Stream<Arguments> testArgs() {
-    Stream.Builder<Arguments> b = Stream.builder();
-    b.add(Arguments.of(MERGE_ON_READ, 0));
-    b.add(Arguments.of(COPY_ON_WRITE, 1));
-    b.add(Arguments.of(MERGE_ON_READ, 2));
-    return b.build();
-  }
-
-  @ParameterizedTest
-  @MethodSource("testArgs")
-  void testNewParquetFileFormat(HoodieTableType tableType, Integer 
nPartitions) {
-    this.bootstrapType = nPartitions == 0 ? "metadata" : "mixed";
-    this.dashPartitions = true;
-    this.tableType = tableType;
-    this.nPartitions = nPartitions;
-    setupDirs();
-
-    //do bootstrap
-    Map<String, String> options = setBootstrapOptions();
-    Dataset<Row> bootstrapDf = sparkSession.emptyDataFrame();
-    bootstrapDf.write().format("hudi")
-        .options(options)
-        .mode(SaveMode.Overwrite)
-        .save(bootstrapTargetPath);
-    runComparisons();
-
-    options = basicOptions();
-    doUpdate(options, "001");
-    runComparisons();
-
-    doInsert(options, "002");
-    runComparisons();
-
-    doDelete(options, "003");
-    runComparisons();
-  }
-
-  protected void runComparisons() {
-    if (tableType.equals(MERGE_ON_READ)) {
-      runComparison(hudiBasePath);
-    }
-    runComparison(bootstrapTargetPath);
-  }
-
-  protected void runComparison(String tableBasePath) {
-    testCount(tableBasePath);
-    runIndividualComparison(tableBasePath);
-    runIndividualComparison(tableBasePath, "partition_path");
-    runIndividualComparison(tableBasePath, "_hoodie_record_key", 
"_hoodie_commit_time", "_hoodie_partition_path");
-    runIndividualComparison(tableBasePath, "_hoodie_commit_time", 
"_hoodie_commit_seqno");
-    runIndividualComparison(tableBasePath, "_hoodie_commit_time", 
"_hoodie_commit_seqno", "partition_path");
-    runIndividualComparison(tableBasePath, "_row_key", "_hoodie_commit_seqno", 
"_hoodie_record_key", "_hoodie_partition_path");
-    runIndividualComparison(tableBasePath, "_row_key", "partition_path", 
"_hoodie_is_deleted", "begin_lon");
-  }
-
-  protected void testCount(String tableBasePath) {
-    Dataset<Row> legacyDf = sparkSession.read().format("hudi")
-        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
-        .load(tableBasePath);
-    Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
-        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true")
-        .load(tableBasePath);
-    assertEquals(legacyDf.count(), fileFormatDf.count());
-  }
-
-  protected scala.collection.Seq<String> seq(String... a) {
-    return 
scala.collection.JavaConverters.asScalaBufferConverter(Arrays.asList(a)).asScala().toSeq();
-  }
-
-  protected void runIndividualComparison(String tableBasePath) {
-    runIndividualComparison(tableBasePath, "");
-  }
-
-  protected void runIndividualComparison(String tableBasePath, String 
firstColumn, String... columns) {
-    List<String> queryTypes = new ArrayList<>();
-    queryTypes.add(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL());
-    if (tableType.equals(MERGE_ON_READ)) {
-      
queryTypes.add(DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL());
-    }
-    for (String queryType : queryTypes) {
-      Dataset<Row> legacyDf = sparkSession.read().format("hudi")
-          .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
-          .option(DataSourceReadOptions.QUERY_TYPE().key(), queryType)
-          .load(tableBasePath);
-      Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
-          .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true")
-          .option(DataSourceReadOptions.QUERY_TYPE().key(), queryType)
-          .load(tableBasePath);
-      if (firstColumn.isEmpty()) {
-        //df.except(df) does not work with map type cols
-        legacyDf = legacyDf.drop("city_to_state");
-        fileFormatDf = fileFormatDf.drop("city_to_state");
-
-        //TODO: [HUDI-3204] for toHadoopFs in BaseFileOnlyRelation, the 
partition columns will be at the end
-        //so just drop column that is out of order here for now
-        if 
(queryType.equals(DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL())
-            && tableType.equals(MERGE_ON_READ) && nPartitions > 0) {
-          legacyDf = legacyDf.drop("partition_path");
-          fileFormatDf = fileFormatDf.drop("partition_path");
-        }
-      } else {
-        if (columns.length > 0) {
-          legacyDf = legacyDf.select(firstColumn, columns);
-          fileFormatDf = fileFormatDf.select(firstColumn, columns);
-        } else {
-          legacyDf = legacyDf.select(firstColumn);
-          fileFormatDf = fileFormatDf.select(firstColumn);
-        }
-      }
-      compareDf(legacyDf, fileFormatDf);
-    }
-  }
-}
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
index 60acf4aaa03c..56eccc0a1376 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -18,8 +18,7 @@
  */
 package org.apache.hudi
 
-import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig}
-import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.SchemaCompatibilityException
@@ -30,7 +29,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.sql.types._
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource}
+import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
 
 import scala.language.postfixOps
 
@@ -74,28 +73,18 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     case 4 => df.withColumn(colToCast, df.col(colToCast).cast("string"))
   }
 
-  def initialiseTable(df: DataFrame, saveDir: String, isCow: Boolean = true): 
Unit = {
-    val opts = if (isCow) {
-      commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
"COPY_ON_WRITE")
-    } else {
-      commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
"MERGE_ON_READ")
-    }
-
+  def initialiseTable(df: DataFrame, saveDir: String, tableType: String = 
"COPY_ON_WRITE"): Unit = {
+    val opts = commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType)
     df.write.format("hudi")
       .options(opts)
       .mode("overwrite")
       .save(saveDir)
   }
 
-  def upsertData(df: DataFrame, saveDir: String, isCow: Boolean = true, 
shouldAllowDroppedColumns: Boolean = false,
+  def upsertData(df: DataFrame, saveDir: String, tableType: String = 
"COPY_ON_WRITE", shouldAllowDroppedColumns: Boolean = false,
                  enableSchemaValidation: Boolean = 
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue().toBoolean): Unit = 
{
-    var opts = if (isCow) {
-      commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
"COPY_ON_WRITE")
-    } else {
-      commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
"MERGE_ON_READ")
-    }
-    opts = opts ++ 
Map(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> 
shouldAllowDroppedColumns.toString)
-
+    val opts = commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType,
+      HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> 
shouldAllowDroppedColumns.toString)
     df.write.format("hudi")
       .options(opts)
       .mode("append")
@@ -103,8 +92,8 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
   }
 
   @ParameterizedTest
-  @MethodSource(Array("testArgs"))
-  def testDataTypePromotions(isCow: Boolean, useFileGroupReader: Boolean): 
Unit = {
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testDataTypePromotions(tableType: String): Unit = {
     // test to read tables with columns that are promoted via avro schema 
resolution
     val tempRecordPath = basePath + "/record_tbl/"
     val _spark = spark
@@ -134,7 +123,7 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
           initDF.show(false)
 
           // recreate table
-          initialiseTable(initDF, tempRecordPath, isCow)
+          initialiseTable(initDF, tempRecordPath, tableType)
 
           // perform avro supported casting
           var upsertDf = prepDataFrame(df2, colInitType)
@@ -143,11 +132,10 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
           upsertDf.show(false)
 
           // upsert
-          upsertData(upsertDf, tempRecordPath, isCow)
+          upsertData(upsertDf, tempRecordPath, tableType)
 
           // read out the table
           val readDf = spark.read.format("hudi")
-            .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
useFileGroupReader)
             .load(tempRecordPath)
           readDf.printSchema()
           readDf.show(false)
@@ -177,8 +165,8 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
   }
 
   @ParameterizedTest
-  @MethodSource(Array("testArgs"))
-  def testAddNewColumn(isCow: Boolean, useFileGroupReader: Boolean): Unit = {
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testAddNewColumn(tableType: String): Unit = {
     // test to add a column
     val tempRecordPath = basePath + "/record_tbl/"
     val _spark = spark
@@ -193,7 +181,7 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     initDF.show(false)
 
     // recreate table
-    initialiseTable(initDF, tempRecordPath, isCow)
+    initialiseTable(initDF, tempRecordPath, tableType)
 
     // perform avro supported operation of adding a new column at the end of 
the table
     val upsertDf = df2
@@ -201,11 +189,10 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     upsertDf.show(false)
 
     // upsert
-    upsertData(upsertDf, tempRecordPath, isCow)
+    upsertData(upsertDf, tempRecordPath, tableType)
 
     // read out the table
     val readDf = spark.read.format("hudi")
-      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
useFileGroupReader)
       .load(tempRecordPath)
     readDf.printSchema()
     readDf.show(false)
@@ -224,7 +211,6 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     val tempRecordPath = basePath + "/record_tbl/"
     val _spark = spark
     import _spark.implicits._
-    val isCow = tableType.equals(HoodieTableType.COPY_ON_WRITE.name())
 
     val df1 = Seq((1, 100, "aaa")).toDF("id", "userid", "name")
     val df2 = Seq((2, "bbb")).toDF("id", "name")
@@ -235,7 +221,7 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     initDF.show(false)
 
     // recreate table
-    initialiseTable(initDF, tempRecordPath, isCow)
+    initialiseTable(initDF, tempRecordPath, tableType)
 
     // perform avro supported operation of deleting a column
     val upsertDf = df2
@@ -244,16 +230,15 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
 
     // upsert
     assertThrows(classOf[SchemaCompatibilityException]) {
-      upsertData(upsertDf, tempRecordPath, isCow, enableSchemaValidation = 
schemaValidationEnabled)
+      upsertData(upsertDf, tempRecordPath, tableType, enableSchemaValidation = 
schemaValidationEnabled)
     }
 
-    upsertData(upsertDf, tempRecordPath, isCow, shouldAllowDroppedColumns = 
true, enableSchemaValidation = schemaValidationEnabled)
+    upsertData(upsertDf, tempRecordPath, tableType, shouldAllowDroppedColumns 
= true, enableSchemaValidation = schemaValidationEnabled)
 
     // read out the table
     //schemaValidationEnabled is a writer config, so we will also test the fg 
reader with the
     //same param since that will only affect the reader
     val readDf = spark.read.format("hudi")
-      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
schemaValidationEnabled)
       .load(tempRecordPath)
     readDf.printSchema()
     readDf.show(false)
@@ -261,8 +246,8 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
   }
 
   @ParameterizedTest
-  @MethodSource(Array("testArgs"))
-  def testColumnPositionChange(isCow: Boolean, useFileGroupReader: Boolean): 
Unit = {
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testColumnPositionChange(tableType: String): Unit = {
     // test to change column positions
     val tempRecordPath = basePath + "/record_tbl/"
     val _spark = spark
@@ -277,7 +262,7 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     initDF.show(false)
 
     // recreate table
-    initialiseTable(initDF, tempRecordPath, isCow)
+    initialiseTable(initDF, tempRecordPath, tableType)
 
     // perform avro supported operation of deleting a column
     val upsertDf = df2
@@ -285,11 +270,10 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     upsertDf.show(false)
 
     // upsert
-    upsertData(upsertDf, tempRecordPath, isCow)
+    upsertData(upsertDf, tempRecordPath, tableType)
 
     // read out the table
     val readDf = spark.read.format("hudi")
-      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
useFileGroupReader)
       .load(tempRecordPath)
     readDf.printSchema()
     readDf.show(false)
@@ -297,8 +281,8 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
   }
 
   @ParameterizedTest
-  @MethodSource(Array("testArgs"))
-  def testArrayOfStructsAddNewColumn(isCow: Boolean, useFileGroupReader: 
Boolean): Unit = {
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testArrayOfStructsAddNewColumn(tableType: String): Unit = {
     // test to add a field to a STRUCT in a column of ARRAY< STRUCT<..> > type
     val tempRecordPath = basePath + "/record_tbl/"
     val arrayStructData = Seq(
@@ -317,7 +301,7 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df1.show(false)
 
     // recreate table
-    initialiseTable(df1, tempRecordPath, isCow)
+    initialiseTable(df1, tempRecordPath, tableType)
 
     // add a column to array of struct
     val newArrayStructData = Seq(
@@ -337,11 +321,10 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df2.printSchema()
     df2.show(false)
     // upsert
-    upsertData(df2, tempRecordPath, isCow)
+    upsertData(df2, tempRecordPath, tableType)
 
     // read out the table
     val readDf = spark.read.format("hudi")
-      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
useFileGroupReader)
       .load(tempRecordPath)
     readDf.printSchema()
     readDf.show(false)
@@ -349,8 +332,8 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
   }
 
   @ParameterizedTest
-  @MethodSource(Array("testArgs"))
-  def testArrayOfStructsChangeColumnType(isCow: Boolean, useFileGroupReader: 
Boolean): Unit = {
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testArrayOfStructsChangeColumnType(tableType: String): Unit = {
     // test to change the type of a field from a STRUCT in a column of ARRAY< 
STRUCT<..> > type
     val tempRecordPath = basePath + "/record_tbl/"
     val arrayStructData = Seq(
@@ -369,7 +352,7 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df1.show(false)
 
     // recreate table
-    initialiseTable(df1, tempRecordPath, isCow)
+    initialiseTable(df1, tempRecordPath, tableType)
 
     // add a column to array of struct
     val newArrayStructData = Seq(
@@ -387,12 +370,11 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df2.printSchema()
     df2.show(false)
     // upsert
-    upsertData(df2, tempRecordPath, isCow)
+    upsertData(df2, tempRecordPath, tableType)
 
     withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"false") {
       // read out the table
       val readDf = spark.read.format("hudi")
-        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
useFileGroupReader)
         .load(tempRecordPath)
       readDf.printSchema()
       readDf.show(false)
@@ -401,8 +383,8 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
   }
 
   @ParameterizedTest
-  @MethodSource(Array("testArgs"))
-  def testArrayOfStructsChangeColumnPosition(isCow: Boolean, 
useFileGroupReader: Boolean): Unit = {
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testArrayOfStructsChangeColumnPosition(tableType: String): Unit = {
     // test to change the position of a field from a STRUCT in a column of 
ARRAY< STRUCT<..> > type
     val tempRecordPath = basePath + "/record_tbl/"
     val arrayStructData = Seq(
@@ -421,7 +403,7 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df1.show(false)
 
     // recreate table
-    initialiseTable(df1, tempRecordPath, isCow)
+    initialiseTable(df1, tempRecordPath, tableType)
 
     // add a column to array of struct
     val newArrayStructData = Seq(
@@ -439,11 +421,10 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df2.printSchema()
     df2.show(false)
     // upsert
-    upsertData(df2, tempRecordPath, isCow)
+    upsertData(df2, tempRecordPath, tableType)
 
     // read out the table
     val readDf = spark.read.format("hudi")
-      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
useFileGroupReader)
       .load(tempRecordPath)
     readDf.printSchema()
     readDf.show(false)
@@ -451,8 +432,8 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
   }
 
   @ParameterizedTest
-  @MethodSource(Array("testArgs"))
-  def testArrayOfMapsChangeValueType(isCow: Boolean, useFileGroupReader: 
Boolean): Unit = {
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testArrayOfMapsChangeValueType(tableType: String): Unit = {
     // test to change the value type of a MAP in a column of ARRAY< MAP<k,v> > 
type
     val tempRecordPath = basePath + "/record_tbl/"
     val arrayMapData = Seq(
@@ -469,7 +450,7 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df1.show(false)
 
     // recreate table
-    initialiseTable(df1, tempRecordPath, isCow)
+    initialiseTable(df1, tempRecordPath, tableType)
 
     // change value type from integer to long
     val newArrayMapData = Seq(
@@ -485,12 +466,11 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df2.printSchema()
     df2.show(false)
     // upsert
-    upsertData(df2, tempRecordPath, isCow)
+    upsertData(df2, tempRecordPath, tableType)
 
     withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"false") {
       // read out the table
       val readDf = spark.read.format("hudi")
-        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
useFileGroupReader)
         .load(tempRecordPath)
       readDf.printSchema()
       readDf.show(false)
@@ -499,8 +479,8 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
   }
 
   @ParameterizedTest
-  @MethodSource(Array("testArgs"))
-  def testArrayOfMapsStructChangeFieldType(isCow: Boolean, useFileGroupReader: 
Boolean): Unit = {
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testArrayOfMapsStructChangeFieldType(tableType: String): Unit = {
     // test to change a field type of a STRUCT in a column of ARRAY< MAP< 
k,STRUCT<..> > > type
     val tempRecordPath = basePath + "/record_tbl/"
     val arrayMapData = Seq(
@@ -526,7 +506,7 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df1.show(false)
 
     // recreate table
-    initialiseTable(df1, tempRecordPath, isCow)
+    initialiseTable(df1, tempRecordPath, tableType)
 
     // change inner struct's type from integer to long
     val newArrayMapData = Seq(
@@ -551,12 +531,11 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df2.printSchema()
     df2.show(false)
     // upsert
-    upsertData(df2, tempRecordPath, isCow)
+    upsertData(df2, tempRecordPath, tableType)
 
     withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"false") {
       // read out the table
       val readDf = spark.read.format("hudi")
-        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
useFileGroupReader)
         .load(tempRecordPath)
       readDf.printSchema()
       readDf.show(false)
@@ -565,8 +544,8 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
   }
 
   @ParameterizedTest
-  @MethodSource(Array("testArgs"))
-  def testArrayOfMapsStructAddField(isCow: Boolean, useFileGroupReader: 
Boolean): Unit = {
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testArrayOfMapsStructAddField(tableType: String): Unit = {
     // test to add a field to a STRUCT in a column of ARRAY< MAP< k,STRUCT<..> 
> > type
     val tempRecordPath = basePath + "/record_tbl/"
     val arrayMapData = Seq(
@@ -592,7 +571,7 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df1.show(false)
 
     // recreate table
-    initialiseTable(df1, tempRecordPath, isCow)
+    initialiseTable(df1, tempRecordPath, tableType)
 
     // add a new column
     val newArrayMapData = Seq(
@@ -618,11 +597,10 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df2.printSchema()
     df2.show(false)
     // upsert
-    upsertData(df2, tempRecordPath, isCow)
+    upsertData(df2, tempRecordPath, tableType)
 
     // read out the table
     val readDf = spark.read.format("hudi")
-      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
useFileGroupReader)
       .load(tempRecordPath)
     readDf.printSchema()
     readDf.show(false)
@@ -630,8 +608,8 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
   }
 
   @ParameterizedTest
-  @MethodSource(Array("testArgs"))
-  def testArrayOfMapsStructChangeFieldPosition(isCow: Boolean, 
useFileGroupReader: Boolean): Unit = {
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testArrayOfMapsStructChangeFieldPosition(tableType: String): Unit = {
     // test to change the position of fields of a STRUCT in a column of ARRAY< 
MAP< k,STRUCT<..> > > type
     val tempRecordPath = basePath + "/record_tbl/"
     val arrayMapData = Seq(
@@ -657,7 +635,7 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df1.show(false)
 
     // recreate table
-    initialiseTable(df1, tempRecordPath, isCow)
+    initialiseTable(df1, tempRecordPath, tableType)
 
     // change column position
     val newArrayMapData = Seq(
@@ -682,11 +660,10 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df2.printSchema()
     df2.show(false)
     // upsert
-    upsertData(df2, tempRecordPath, isCow)
+    upsertData(df2, tempRecordPath, tableType)
 
     // read out the table
     val readDf = spark.read.format("hudi")
-      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
useFileGroupReader)
       .load(tempRecordPath)
     readDf.printSchema()
     readDf.show(false)
@@ -694,8 +671,8 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
   }
 
   @ParameterizedTest
-  @MethodSource(Array("testArgs"))
-  def testArrayOfMapsStructDeleteField(isCow: Boolean, useFileGroupReader: 
Boolean): Unit = {
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testArrayOfMapsStructDeleteField(tableType: String): Unit = {
     // test to delete a field of a STRUCT in a column of ARRAY< MAP< 
k,STRUCT<..> > > type
 
     val tempRecordPath = basePath + "/record_tbl/"
@@ -722,7 +699,7 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df1.show(false)
 
     // recreate table
-    initialiseTable(df1, tempRecordPath, isCow)
+    initialiseTable(df1, tempRecordPath, tableType)
 
     // change column position
     val newArrayMapData = Seq(
@@ -746,11 +723,10 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df2.printSchema()
     df2.show(false)
     // upsert
-    upsertData(df2, tempRecordPath, isCow, true)
+    upsertData(df2, tempRecordPath, tableType, true)
 
     // read out the table
     val readDf = spark.read.format("hudi")
-      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
useFileGroupReader)
       .load(tempRecordPath)
     readDf.printSchema()
     readDf.show(false)
@@ -758,8 +734,8 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
   }
 
   @ParameterizedTest
-  @MethodSource(Array("testArgs"))
-  def testComplexOperationsOnTable(isCow: Boolean, useFileGroupReader: 
Boolean): Unit = {
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testComplexOperationsOnTable(tableType: String): Unit = {
     // test a series of changes on a Hudi table
 
     var defaultPartitionIdx = 0
@@ -777,13 +753,13 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     val df1 = Seq((1, 100, newPartition)).toDF("id", "userid", "name")
     df1.printSchema()
     df1.show(false)
-    initialiseTable(df1, tempRecordPath, isCow)
+    initialiseTable(df1, tempRecordPath, tableType)
 
     // 2. Promote INT type to LONG into a different partition
     val df2 = Seq((2, 200L, newPartition)).toDF("id", "userid", "name")
     df2.printSchema()
     df2.show(false)
-    upsertData(df2, tempRecordPath, isCow)
+    upsertData(df2, tempRecordPath, tableType)
 
     // 3. Promote LONG to FLOAT
     var df3 = Seq((3, 300, newPartition)).toDF("id", "userid", "name")
@@ -825,7 +801,6 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
 
     // read out the table
     val readDf = spark.read.format("hudi")
-      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
useFileGroupReader)
       .load(tempRecordPath)
     readDf.printSchema()
     readDf.show(false)
@@ -833,8 +808,8 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
   }
 
   @ParameterizedTest
-  @MethodSource(Array("testArgs"))
-  def testNestedTypeVectorizedReadWithTypeChange(isCow: Boolean, 
useFileGroupReader: Boolean): Unit = {
+  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
+  def testNestedTypeVectorizedReadWithTypeChange(tableType: String): Unit = {
     // test to change the value type of a MAP in a column of ARRAY< MAP<k,v> > 
type
     val tempRecordPath = basePath + "/record_tbl/"
     val arrayMapData = Seq(
@@ -851,10 +826,10 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df1.show(false)
 
     // recreate table
-    initialiseTable(df1, tempRecordPath, isCow)
+    initialiseTable(df1, tempRecordPath, tableType)
 
     // read out the table, will not throw any exception
-    readTable(tempRecordPath, useFileGroupReader)
+    readTable(tempRecordPath)
 
     // change value type from integer to long
     val newArrayMapData = Seq(
@@ -870,31 +845,30 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
     df2.printSchema()
     df2.show(false)
     // upsert
-    upsertData(df2, tempRecordPath, isCow)
+    upsertData(df2, tempRecordPath, tableType)
 
     // after implicit type change, read the table with vectorized read enabled
-    if (HoodieSparkUtils.gteqSpark3_4 || !useFileGroupReader) {
+    if (HoodieSparkUtils.gteqSpark3_4) {
       assertThrows(classOf[SparkException]) {
         withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"true") {
-          readTable(tempRecordPath, useFileGroupReader)
+          readTable(tempRecordPath)
         }
       }
     } else {
       withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"true") {
-        readTable(tempRecordPath, useFileGroupReader)
+        readTable(tempRecordPath)
       }
     }
 
     withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> 
"false") {
-      readTable(tempRecordPath, useFileGroupReader)
+      readTable(tempRecordPath)
     }
   }
 
 
-  private def readTable(path: String, useFileGroupReader: Boolean): Unit = {
+  private def readTable(path: String): Unit = {
     // read out the table
     val readDf = spark.read.format("hudi")
-      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
useFileGroupReader)
       .load(path)
     readDf.printSchema()
     readDf.show(false)
@@ -918,14 +892,3 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
   }
 
 }
-
-object TestAvroSchemaResolutionSupport {
-  def testArgs: java.util.stream.Stream[Arguments] = {
-    val scenarios = Array(
-      Seq(true, true),
-      Seq(false, true),
-      Seq(false, false)
-    )
-    java.util.Arrays.stream(scenarios.map(as => 
Arguments.arguments(as.map(_.asInstanceOf[AnyRef]):_*)))
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index 8986a8ff55f6..59edb399259a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -21,7 +21,7 @@ package org.apache.hudi.functional
 
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
 import org.apache.hudi.client.validator.{SqlQueryEqualityPreCommitValidator, 
SqlQueryInequalityPreCommitValidator}
-import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig}
+import org.apache.hudi.common.config.HoodieMetadataConfig
 import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
 TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TYPE_FIELD}
 import org.apache.hudi.common.model.WriteOperationType
 import org.apache.hudi.common.table.HoodieTableConfig
@@ -72,28 +72,21 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
 
   @ParameterizedTest
   @CsvSource(value = Array(
-    "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|true",
-    
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|true",
-    "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|true",
-    "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|true",
-    
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|true",
-    "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|true",
-    "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|false",
-    
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|false",
-    "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|false",
-    "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|false",
-    
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|false",
-    "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|false"
+    "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
+    "true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency",
+    "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key",
+    "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
+    "false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency",
+    "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key"
   ), delimiter = '|')
-  def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, 
recordKeys: String, fileGroupReaderEnabled: Boolean): Unit = {
+  def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, 
recordKeys: String): Unit = {
     var options: Map[String, String] = commonOpts ++ Map(
       HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled),
       DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> keyGenClass,
       DataSourceWriteOptions.RECORDKEY_FIELD.key -> recordKeys,
       HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true")
 
-    val readOptions = Map(HoodieMetadataConfig.ENABLE.key() -> 
String.valueOf(isMetadataEnabled),
-      HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> 
String.valueOf(fileGroupReaderEnabled))
+    val readOptions = Map(HoodieMetadataConfig.ENABLE.key() -> 
String.valueOf(isMetadataEnabled))
 
     val isTimestampBasedKeyGen: Boolean = 
classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)
     if (isTimestampBasedKeyGen) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
index 207cb9018eb2..feccc6b4377b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
@@ -41,7 +41,7 @@ import 
org.apache.hudi.util.JavaScalaConverters.convertJavaListToScalaSeq
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.functions.{col, explode}
-import org.junit.jupiter.api.{Tag, Test}
+import org.junit.jupiter.api.{Disabled, Tag, Test}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
index 2f59db9a5107..9ed6ed282ef7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
@@ -21,7 +21,7 @@ package org.apache.hudi.functional
 import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.QuickstartUtils.{convertToStringList, 
getQuickstartWriteConfigs}
-import org.apache.hudi.common.config.{HoodieReaderConfig, RecordMergeMode}
+import org.apache.hudi.common.config.RecordMergeMode
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.util.Option
@@ -37,7 +37,7 @@ import org.apache.spark.sql.types.{DoubleType, StringType}
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.CsvSource
+import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
 
 import java.util.function.Consumer
 
@@ -69,14 +69,8 @@ class TestPartialUpdateAvroPayload extends 
HoodieClientTestBase {
   }
 
   @ParameterizedTest
-  @CsvSource(Array(
-    "COPY_ON_WRITE,false",
-    "MERGE_ON_READ,false",
-    "COPY_ON_WRITE,true",
-    "MERGE_ON_READ,true"
-  ))
-  def testPartialUpdatesAvroPayloadPrecombine(tableType: String, 
useFileGroupReader: Boolean): Unit = {
-    val hoodieTableType = HoodieTableType.valueOf(tableType)
+  @EnumSource(value = classOf[HoodieTableType])
+  def testPartialUpdatesAvroPayloadPrecombine(hoodieTableType: 
HoodieTableType): Unit = {
     val dataGenerator = new QuickstartUtils.DataGenerator()
     val records = convertToStringList(dataGenerator.generateInserts(1))
     val recordsRDD = spark.sparkContext.parallelize(records.asScala.toSeq, 2)
@@ -125,9 +119,7 @@ class TestPartialUpdateAvroPayload extends 
HoodieClientTestBase {
       .mode(SaveMode.Append)
       .save(basePath)
 
-    val finalDF = spark.read.format("hudi")
-      .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
String.valueOf(useFileGroupReader))
-      .load(basePath)
+    val finalDF = spark.read.format("hudi").load(basePath)
     assertEquals(finalDF.select("rider").collectAsList().get(0).getString(0), 
upsert1DF.select("rider").collectAsList().get(0).getString(0))
     assertEquals(finalDF.select("driver").collectAsList().get(0).getString(0), 
upsert2DF.select("driver").collectAsList().get(0).getString(0))
     assertEquals(finalDF.select("fare").collectAsList().get(0).getDouble(0), 
upsert3DF.select("fare").collectAsList().get(0).getDouble(0))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
index e54df91b0712..2ba74abbf8b4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
@@ -33,20 +33,13 @@ class TestSparkSqlWithTimestampKeyGenerator extends 
HoodieSparkSqlTestBase {
 
   test("Test Spark SQL with timestamp key generator") {
     withTempDir { tmp =>
-      Seq(
-        Seq("COPY_ON_WRITE", "true"),
-        Seq("COPY_ON_WRITE", "false"),
-        Seq("MERGE_ON_READ", "true"),
-        Seq("MERGE_ON_READ", "false")
-      ).foreach { testParams =>
-        val tableType = testParams(0)
+      Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType =>
         // enables use of engine agnostic file group reader
-        val shouldUseFileGroupReader = testParams(1)
 
         timestampKeyGeneratorSettings.foreach { keyGeneratorSettings =>
           withTable(generateTableName) { tableName =>
             // Warning level is used due to CI run with warn-log profile for 
quick failed cases identification
-            LOG.warn(s"Table '$tableName' with parameters: $testParams. 
Timestamp key generator settings: $keyGeneratorSettings")
+            LOG.warn(s"Table '$tableName'. Timestamp key generator settings: 
$keyGeneratorSettings")
             val tablePath = tmp.getCanonicalPath + "/" + tableName
             val tsType = if (keyGeneratorSettings.contains("DATE_STRING")) 
"string" else "long"
             spark.sql(
@@ -65,7 +58,6 @@ class TestSparkSqlWithTimestampKeyGenerator extends 
HoodieSparkSqlTestBase {
                  |   orderingFields = 'precomb',
                  |   hoodie.datasource.write.partitionpath.field = 'ts',
                  |   hoodie.datasource.write.hive_style_partitioning = 'false',
-                 |   hoodie.file.group.reader.enabled = 
'$shouldUseFileGroupReader',
                  |   hoodie.table.keygenerator.class = 
'org.apache.hudi.keygen.TimestampBasedKeyGenerator',
                  |   $keyGeneratorSettings
                  | )
@@ -80,7 +72,6 @@ class TestSparkSqlWithTimestampKeyGenerator extends 
HoodieSparkSqlTestBase {
               (dataBatchesWithLongOfSeconds, queryResultWithLongOfSeconds)
 
             withSQLConf(SPARK_SQL_INSERT_INTO_OPERATION.key -> "upsert",
-              "hoodie.file.group.reader.enabled" -> 
s"$shouldUseFileGroupReader",
               "hoodie.datasource.query.type" -> "snapshot") {
               // two partitions, one contains parquet file only, the second 
one contains parquet and log files for MOR, and two parquets for COW
               spark.sql(s"INSERT INTO $tableName VALUES ${dataBatches(0)}")
@@ -88,15 +79,7 @@ class TestSparkSqlWithTimestampKeyGenerator extends 
HoodieSparkSqlTestBase {
 
               val queryResult = spark.sql(s"SELECT id, name, precomb, ts FROM 
$tableName ORDER BY id").collect().mkString("; ")
               LOG.warn(s"Query result: $queryResult")
-              // TODO: use `shouldExtractPartitionValuesFromPartitionPath` 
uniformly, and get `expectedQueryResult` for all cases instead of 
`expectedQueryResultWithLossyString` for some cases
-              //   After it we could properly process filters like "WHERE ts 
BETWEEN 1078016000 and 1718953003" and add tests with partition pruning.
-              //   COW: Fix for [HUDI-3896] overwrites 
`shouldExtractPartitionValuesFromPartitionPath` in `BaseFileOnlyRelation`, 
therefore for COW we extracting from partition paths and get nulls
-              //   shouldUseFileGroupReader: [HUDI-7925] Currently there is no 
logic for `shouldExtractPartitionValuesFromPartitionPath` in 
`HoodieBaseHadoopFsRelationFactory`
-              //   UPDATE: with [HUDI-5807] we now have fg reader support. 
However partition pruning is still not fixed.
-              if (tableType == "COPY_ON_WRITE" && 
!shouldUseFileGroupReader.toBoolean)
-                assertResult(expectedQueryResultWithLossyString)(queryResult)
-              else
-                assertResult(expectedQueryResult)(queryResult)
+              assertResult(expectedQueryResult)(queryResult)
             }
           }
         }
@@ -202,5 +185,4 @@ object TestSparkSqlWithTimestampKeyGenerator {
   val queryResultWithLongOfSeconds: String = "[1,a1,1,1078016523]; 
[2,a3,1,1718952603]"
   val queryResultWithLongOfMilliseconds: String = "[1,a1,1,1078016523000]; 
[2,a3,1,1718952603000]"
   val queryResultWithString: String = "[1,a1,1,2004-02-29 01:02:03]; 
[2,a3,1,2024-06-21 06:50:03]"
-  val expectedQueryResultWithLossyString: String = "[1,a1,1,2004-02-29 01]; 
[2,a3,1,2024-06-21 06]"
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestVectorizedReadWithSchemaEvolution.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestVectorizedReadWithSchemaEvolution.scala
index 244b579fc587..5738621bea18 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestVectorizedReadWithSchemaEvolution.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestVectorizedReadWithSchemaEvolution.scala
@@ -27,8 +27,7 @@ class TestVectorizedReadWithSchemaEvolution extends 
HoodieSparkSqlTestBase {
           "hoodie.schema.on.read.enable" -> "true",
           "spark.sql.parquet.enableVectorizedReader" -> "true",
           "spark.sql.codegen.maxFields" -> "1",
-          "hoodie.parquet.small.file.limit" -> "0",
-          "hoodie.file.group.reader.enabled" -> "false"
+          "hoodie.parquet.small.file.limit" -> "0"
         ) {
           withTempDir { tmp =>
             val tableName = generateTableName
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable4.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable4.scala
index 1e8e5151a7f1..8d786fde8c8c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable4.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable4.scala
@@ -439,43 +439,6 @@ class TestInsertTable4 extends HoodieSparkSqlTestBase {
     }
   }
 
-  test("Test vectorized read nested columns for 
LegacyHoodieParquetFileFormat") {
-    withSQLConf(
-      "hoodie.datasource.read.use.new.parquet.file.format" -> "false",
-      "hoodie.file.group.reader.enabled" -> "false",
-      "spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true",
-      "spark.sql.parquet.enableVectorizedReader" -> "true") {
-      withTempDir { tmp =>
-        val tableName = generateTableName
-        spark.sql(
-          s"""
-             |create table $tableName (
-             |  id int,
-             |  name string,
-             |  attributes map<string, string>,
-             |  price double,
-             |  ts long,
-             |  dt string
-             |) using hudi
-             | tblproperties (primaryKey = 'id')
-             | partitioned by (dt)
-             | location '${tmp.getCanonicalPath}'
-                    """.stripMargin)
-        spark.sql(
-          s"""
-             | insert into $tableName values
-             | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000, 
'2021-01-05'),
-             | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000, 
'2021-01-06'),
-             | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000, 
'2021-01-07')
-                    """.stripMargin)
-        // Check the inserted records with map type attributes
-        checkAnswer(s"select id, name, price, ts, dt from $tableName where 
attributes.color = 'red'")(
-          Seq(1, "a1", 10.0, 1000, "2021-01-05")
-        )
-      }
-    }
-  }
-
   def ingestAndValidateDataNoPrecombine(tableType: String, tableName: String, 
tmp: File,
                                         expectedOperationtype: 
WriteOperationType,
                                         setOptions: (String, String)*): Unit = 
{

Reply via email to