This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 53725e8c91c2a904bf00d7a4e97556b7adca068c
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sat Mar 2 21:56:54 2024 -0800

    [HUDI-7469] Reduce redundant tests with Hudi record types (#10800)
---
 .../apache/hudi/functional/TestCOWDataSource.scala |  72 +++---
 .../apache/hudi/functional/TestMORDataSource.scala |  20 +-
 .../sql/hudi/TestAlterTableDropPartition.scala     |   4 +-
 .../spark/sql/hudi/TestCompactionTable.scala       |   4 +-
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 257 ++++++++++-----------
 .../apache/spark/sql/hudi/TestMergeIntoTable.scala |  24 +-
 .../spark/sql/hudi/TestMergeIntoTable2.scala       |  20 +-
 .../TestMergeIntoTableWithNonRecordKeyField.scala  |   8 +-
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  |  16 +-
 .../spark/sql/hudi/TestTimeTravelTable.scala       |  12 +-
 .../apache/spark/sql/hudi/TestUpdateTable.scala    |   6 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  63 +++--
 12 files changed, 242 insertions(+), 264 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index a28a228fd46..5614b414927 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -23,8 +23,9 @@ import 
org.apache.hudi.DataSourceWriteOptions.{INLINE_CLUSTERING_ENABLE, KEYGENE
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.QuickstartUtils.{convertToStringList, 
getQuickstartWriteConfigs}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
-import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
 TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
 import org.apache.hudi.common.config.HoodieMetadataConfig
+import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
 TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
+import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline, 
TimelineUtils}
@@ -44,7 +45,6 @@ import org.apache.hudi.metrics.{Metrics, MetricsReporterType}
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
 import org.apache.hudi.util.JFunction
 import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieDataSourceHelpers, QuickstartUtils, 
ScalaAssertionSupport}
-import org.apache.hudi.common.fs.FSUtils
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
@@ -96,10 +96,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     System.gc()
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
-  def testShortNameStorage(recordType: HoodieRecordType) {
-    val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
+  @Test
+  def testShortNameStorage(): Unit = {
+    val (writeOpts, readOpts) = getWriterReaderOpts()
 
     // Insert Operation
     val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
@@ -564,10 +563,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
    * archival should kick in and 2 commits should be archived. If schema is 
valid, no exception will be thrown. If not,
    * NPE will be thrown.
    */
-  @ParameterizedTest
-  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
-  def testArchivalWithBulkInsert(recordType: HoodieRecordType): Unit = {
-    val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
+  @Test
+  def testArchivalWithBulkInsert(): Unit = {
+    val (writeOpts, readOpts) = getWriterReaderOpts()
 
     var structType: StructType = null
     for (i <- 1 to 7) {
@@ -696,10 +694,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     }
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
-  def testOverWriteModeUseReplaceAction(recordType: HoodieRecordType): Unit = {
-    val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
+  @Test
+  def testOverWriteModeUseReplaceAction(): Unit = {
+    val (writeOpts, readOpts) = getWriterReaderOpts()
     val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
     val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
     inputDF1.write.format("org.apache.hudi")
@@ -774,10 +771,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     assertEquals(expectedCount, hudiReadPathDF.count())
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
-  def testOverWriteTableModeUseReplaceAction(recordType: HoodieRecordType): 
Unit = {
-    val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
+  @Test
+  def testOverWriteTableModeUseReplaceAction(): Unit = {
+    val (writeOpts, readOpts) = getWriterReaderOpts()
 
     val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
     val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
@@ -804,10 +800,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     assertEquals("replacecommit", commits(1))
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
-  def testOverWriteModeUseReplaceActionOnDisJointPartitions(recordType: 
HoodieRecordType): Unit = {
-    val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
+  @Test
+  def testOverWriteModeUseReplaceActionOnDisJointPartitions(): Unit = {
+    val (writeOpts, readOpts) = getWriterReaderOpts()
 
     // step1: Write 5 records to hoodie table for partition1 
DEFAULT_FIRST_PARTITION_PATH
     val records1 = recordsToStrings(dataGen.generateInsertsForPartition("001", 
5, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
@@ -864,10 +859,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     assertEquals("replacecommit", commits(2))
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
-  def testOverWriteTableModeUseReplaceActionOnDisJointPartitions(recordType: 
HoodieRecordType): Unit = {
-    val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
+  @Test
+  def testOverWriteTableModeUseReplaceActionOnDisJointPartitions(): Unit = {
+    val (writeOpts, readOpts) = getWriterReaderOpts()
 
     // step1: Write 5 records to hoodie table for partition1 
DEFAULT_FIRST_PARTITION_PATH
     val records1 = recordsToStrings(dataGen.generateInsertsForPartition("001", 
5, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
@@ -1003,10 +997,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     })
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
-  def testWithAutoCommitOn(recordType: HoodieRecordType): Unit = {
-    val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
+  @Test
+  def testWithAutoCommitOn(): Unit = {
+    val (writeOpts, readOpts) = getWriterReaderOpts()
 
     val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
     val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
@@ -1318,8 +1311,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
 
   @ParameterizedTest
   @CsvSource(Array(
-    "true,false,AVRO", "true,true,AVRO", "false,true,AVRO", "false,false,AVRO",
-    "true,false,SPARK", "true,true,SPARK", "false,true,SPARK", 
"false,false,SPARK"
+    "true,false,AVRO", "true,true,AVRO", "false,true,AVRO", "false,false,AVRO"
   ))
   def testQueryCOWWithBasePathAndFileIndex(partitionEncode: Boolean, 
isMetadataEnabled: Boolean, recordType: HoodieRecordType): Unit = {
     testPartitionPruning(
@@ -1516,11 +1508,10 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
     }
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
-  def testSaveAsTableInDifferentModes(recordType: HoodieRecordType): Unit = {
+  @Test
+  def testSaveAsTableInDifferentModes(): Unit = {
     val options = scala.collection.mutable.Map.empty ++ commonOpts ++ 
Map("path" -> basePath)
-    val (writeOpts, readOpts) = getWriterReaderOpts(recordType, options.toMap)
+    val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO, 
options.toMap)
 
     // first use the Overwrite mode
     val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
@@ -1583,10 +1574,9 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
     
assertEquals(spark.read.format("hudi").options(readOpts).load(basePath).count(),
 9)
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
-  def testMetricsReporterViaDataSource(recordType: HoodieRecordType): Unit = {
-    val (writeOpts, _) = getWriterReaderOpts(recordType, 
getQuickstartWriteConfigs.asScala.toMap)
+  @Test
+  def testMetricsReporterViaDataSource(): Unit = {
+    val (writeOpts, _) = getWriterReaderOpts(HoodieRecordType.AVRO, 
getQuickstartWriteConfigs.asScala.toMap)
 
     val dataGenerator = new QuickstartUtils.DataGenerator()
     val records = convertToStringList(dataGenerator.generateInserts(10))
@@ -1680,7 +1670,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
       })
   }
 
-  def getWriterReaderOpts(recordType: HoodieRecordType,
+  def getWriterReaderOpts(recordType: HoodieRecordType = HoodieRecordType.AVRO,
                           opt: Map[String, String] = commonOpts,
                           enableFileIndex: Boolean = 
DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()):
   (Map[String, String], Map[String, String]) = {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index b1d3a17004b..45bd3c645d4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -27,7 +27,7 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
-import org.apache.hudi.common.testutils.RawTripTestPayload.{recordToString, 
recordsToStrings}
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.util
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
 import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
@@ -44,7 +44,7 @@ import org.apache.spark.sql.types.BooleanType
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
 import org.slf4j.LoggerFactory
 
 import java.util.function.Consumer
@@ -948,10 +948,9 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
     assertEquals(20, 
spark.read.format("hudi").options(readOpts).load(basePath).count())
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
-  def testTempFilesCleanForClustering(recordType: HoodieRecordType): Unit = {
-    val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
+  @Test
+  def testTempFilesCleanForClustering(): Unit = {
+    val (writeOpts, readOpts) = getWriterReaderOpts()
 
     val records1 = recordsToStrings(dataGen.generateInserts("001", 
1000)).asScala
     val inputDF1: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(records1, 2))
@@ -1230,9 +1229,8 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
    * The read-optimized query should read `fg1_dc1.parquet` only in this case.
    */
   @ParameterizedTest
-  @CsvSource(Array("true,AVRO", "true,SPARK", "false,AVRO", "false,SPARK"))
-  def 
testReadOptimizedQueryAfterInflightCompactionAndCompletedDeltaCommit(enableFileIndex:
 Boolean,
-                                                                           
recordType: HoodieRecordType): Unit = {
+  @ValueSource(booleans = Array(true, false))
+  def 
testReadOptimizedQueryAfterInflightCompactionAndCompletedDeltaCommit(enableFileIndex:
 Boolean): Unit = {
     val (tableName, tablePath) = ("hoodie_mor_ro_read_test_table", 
s"${basePath}_mor_test_table")
     val precombineField = "col3"
     val recordKeyField = "key"
@@ -1250,7 +1248,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       "hoodie.upsert.shuffle.parallelism" -> "1")
     val pathForROQuery = getPathForROQuery(tablePath, !enableFileIndex, 0)
 
-    val (writeOpts, readOpts) = getWriterReaderOpts(recordType, options, 
enableFileIndex)
+    val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO, 
options, enableFileIndex)
 
     // First batch with all inserts
     // Deltacommit1 (DC1, completed), writing file group 1 (fg1)
@@ -1383,7 +1381,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
     assertEquals(inputRows, readRows)
   }
 
-  def getWriterReaderOpts(recordType: HoodieRecordType,
+  def getWriterReaderOpts(recordType: HoodieRecordType = HoodieRecordType.AVRO,
                           opt: Map[String, String] = commonOpts,
                           enableFileIndex: Boolean = 
DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()):
   (Map[String, String], Map[String, String]) = {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
index 2c592f5a815..7a146591f4e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
@@ -621,7 +621,7 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
   }
 
   test("Test drop partition with wildcards") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         val tableName = generateTableName
         spark.sql(
@@ -653,6 +653,6 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
           Seq("2023-09-01")
         )
       }
-    })
+    }
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
index 568e3569725..5ded75dcdab 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
@@ -75,7 +75,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
   }
 
   test("Test compaction path") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       val tableName = generateTableName
       spark.sql(
         s"""
@@ -132,6 +132,6 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
       checkException(s"run compaction on '${tmp.getCanonicalPath}' at 12345")(
         s"specific 12345 instants is not exist"
       )
-    })
+    }
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 602881b6d2d..38f2e4e428c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -38,7 +38,7 @@ import java.io.File
 class TestInsertTable extends HoodieSparkSqlTestBase {
 
   test("Test table type name incase-sensitive test") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       val targetTable = generateTableName
       val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
 
@@ -86,7 +86,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         Seq("1", "aa", 123, "2024-02-19", 10),
         Seq("2", "bb", 456, "2024-02-19", 10)
       )
-    })
+    }
   }
 
   test("Test Insert Into with values") {
@@ -125,7 +125,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   }
 
   test("Test Insert Into with static partition") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       val tableName = generateTableName
       // Create a partitioned table
       spark.sql(
@@ -174,11 +174,11 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         Seq(2, "a2", 20.0, 2000, "2021-01-06"),
         Seq(3, "a3", 30.0, 3000, "2021-01-07")
       )
-    })
+    }
   }
 
   test("Test Insert Into with dynamic partition") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       val tableName = generateTableName
       // Create a partitioned table
       spark.sql(
@@ -228,7 +228,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         Seq(2, "a2", 20.0, 2000, "2021-01-06"),
         Seq(3, "a3", 30.0, 3000, "2021-01-07")
       )
-    })
+    }
   }
 
   test("Test Insert Into with multi partition") {
@@ -409,7 +409,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   }
 
   test("Test Insert Overwrite") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         withTable(generateTableName) { tableName =>
           // Create a partitioned table
@@ -554,7 +554,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
           )
         }
       }
-    })
+    }
   }
 
   test("Test insert overwrite for multi partitioned table") {
@@ -656,19 +656,19 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   }
 
   test("Test Different Type of Partition Column") {
-   withRecordType()(withTempDir { tmp =>
-     val typeAndValue = Seq(
-       ("string", "'1000'"),
-       ("int", 1000),
-       ("bigint", 10000),
-       ("timestamp", "TIMESTAMP'2021-05-20 00:00:00'"),
-       ("date", "DATE'2021-05-20'")
-     )
-     typeAndValue.foreach { case (partitionType, partitionValue) =>
-       val tableName = generateTableName
-       validateDifferentTypesOfPartitionColumn(tmp, partitionType, 
partitionValue, tableName)
-     }
-   })
+    withTempDir { tmp =>
+      val typeAndValue = Seq(
+        ("string", "'1000'"),
+        ("int", 1000),
+        ("bigint", 10000),
+        ("timestamp", "TIMESTAMP'2021-05-20 00:00:00'"),
+        ("date", "DATE'2021-05-20'")
+      )
+      typeAndValue.foreach { case (partitionType, partitionValue) =>
+        val tableName = generateTableName
+        validateDifferentTypesOfPartitionColumn(tmp, partitionType, 
partitionValue, tableName)
+      }
+    }
   }
 
   test("Test TimestampType Partition Column With Consistent Logical Timestamp 
Enabled") {
@@ -686,7 +686,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   }
 
   test("Test insert for uppercase table name") {
-    withRecordType()(withTempDir{ tmp =>
+    withTempDir { tmp =>
       val tableName = s"H_$generateTableName"
       if (HoodieSparkUtils.gteqSpark3_5) {
         // [SPARK-44284] Spark 3.5+ requires conf below to be case sensitive
@@ -713,84 +713,82 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         .setConf(spark.sessionState.newHadoopConf())
         .build()
       assertResult(tableName)(metaClient.getTableConfig.getTableName)
-    })
+    }
   }
 
   test("Test Insert Exception") {
-    withRecordType() {
-      val tableName = generateTableName
+    val tableName = generateTableName
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  dt string
+         |) using hudi
+         | tblproperties (primaryKey = 'id')
+         | partitioned by (dt)
+     """.stripMargin)
+    val tooManyDataColumnsErrorMsg = if (HoodieSparkUtils.gteqSpark3_5) {
+      s"""
+         |[INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS] Cannot write to 
`spark_catalog`.`default`.`$tableName`, the reason is too many data columns:
+         |Table columns: `id`, `name`, `price`.
+         |Data columns: `1`, `a1`, `10`, `2021-06-20`.
+         |""".stripMargin
+    } else if (HoodieSparkUtils.gteqSpark3_4) {
+      """
+        |too many data columns:
+        |Table columns: 'id', 'name', 'price'.
+        |Data columns: '1', 'a1', '10', '2021-06-20'.
+        |""".stripMargin
+    } else {
+      """
+        |too many data columns:
+        |Table columns: 'id', 'name', 'price'
+        |Data columns: '1', 'a1', '10', '2021-06-20'
+        |""".stripMargin
+    }
+    checkExceptionContain(s"insert into $tableName partition(dt = 
'2021-06-20') select 1, 'a1', 10, '2021-06-20'")(
+      tooManyDataColumnsErrorMsg)
+
+    val notEnoughDataColumnsErrorMsg = if (HoodieSparkUtils.gteqSpark3_5) {
+      s"""
+         |[INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS] Cannot write 
to `spark_catalog`.`default`.`$tableName`, the reason is not enough data 
columns:
+         |Table columns: `id`, `name`, `price`, `dt`.
+         |Data columns: `1`, `a1`, `10`.
+         |""".stripMargin
+    } else if (HoodieSparkUtils.gteqSpark3_4) {
+      """
+        |not enough data columns:
+        |Table columns: 'id', 'name', 'price', 'dt'.
+        |Data columns: '1', 'a1', '10'.
+        |""".stripMargin
+    } else {
+      """
+        |not enough data columns:
+        |Table columns: 'id', 'name', 'price', 'dt'
+        |Data columns: '1', 'a1', '10'
+        |""".stripMargin
+    }
+    checkExceptionContain(s"insert into $tableName select 1, 'a1', 
10")(notEnoughDataColumnsErrorMsg)
+    withSQLConf("hoodie.sql.bulk.insert.enable" -> "true", 
"hoodie.sql.insert.mode" -> "strict") {
+      val tableName2 = generateTableName
       spark.sql(
         s"""
-           |create table $tableName (
+           |create table $tableName2 (
            |  id int,
            |  name string,
            |  price double,
-           |  dt string
+           |  ts long
            |) using hudi
-           | tblproperties (primaryKey = 'id')
-           | partitioned by (dt)
-       """.stripMargin)
-      val tooManyDataColumnsErrorMsg = if (HoodieSparkUtils.gteqSpark3_5) {
-        s"""
-          |[INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS] Cannot write 
to `spark_catalog`.`default`.`$tableName`, the reason is too many data columns:
-          |Table columns: `id`, `name`, `price`.
-          |Data columns: `1`, `a1`, `10`, `2021-06-20`.
-          |""".stripMargin
-      } else if (HoodieSparkUtils.gteqSpark3_4) {
-        """
-          |too many data columns:
-          |Table columns: 'id', 'name', 'price'.
-          |Data columns: '1', 'a1', '10', '2021-06-20'.
-          |""".stripMargin
-      } else {
-        """
-          |too many data columns:
-          |Table columns: 'id', 'name', 'price'
-          |Data columns: '1', 'a1', '10', '2021-06-20'
-          |""".stripMargin
-      }
-      checkExceptionContain(s"insert into $tableName partition(dt = 
'2021-06-20') select 1, 'a1', 10, '2021-06-20'")(
-        tooManyDataColumnsErrorMsg)
-
-      val notEnoughDataColumnsErrorMsg = if (HoodieSparkUtils.gteqSpark3_5) {
-        s"""
-          |[INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS] Cannot write 
to `spark_catalog`.`default`.`$tableName`, the reason is not enough data 
columns:
-          |Table columns: `id`, `name`, `price`, `dt`.
-          |Data columns: `1`, `a1`, `10`.
-          |""".stripMargin
-      } else if (HoodieSparkUtils.gteqSpark3_4) {
-        """
-          |not enough data columns:
-          |Table columns: 'id', 'name', 'price', 'dt'.
-          |Data columns: '1', 'a1', '10'.
-          |""".stripMargin
-      } else {
-        """
-          |not enough data columns:
-          |Table columns: 'id', 'name', 'price', 'dt'
-          |Data columns: '1', 'a1', '10'
-          |""".stripMargin
-      }
-      checkExceptionContain(s"insert into $tableName select 1, 'a1', 
10")(notEnoughDataColumnsErrorMsg)
-      withSQLConf("hoodie.sql.bulk.insert.enable" -> "true", 
"hoodie.sql.insert.mode" -> "strict") {
-        val tableName2 = generateTableName
-        spark.sql(
-          s"""
-             |create table $tableName2 (
-             |  id int,
-             |  name string,
-             |  price double,
-             |  ts long
-             |) using hudi
-             | tblproperties (
-             |   primaryKey = 'id',
-             |   preCombineField = 'ts'
-             | )
-          """.stripMargin)
-        checkException(s"insert into $tableName2 values(1, 'a1', 10, 1000)")(
-          "Table with primaryKey can not use bulk insert in strict mode."
-        )
-      }
+           | tblproperties (
+           |   primaryKey = 'id',
+           |   preCombineField = 'ts'
+           | )
+        """.stripMargin)
+      checkException(s"insert into $tableName2 values(1, 'a1', 10, 1000)")(
+        "Table with primaryKey can not use bulk insert in strict mode."
+      )
     }
   }
 
@@ -823,8 +821,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
 
   test("Test bulk insert with insert into for single partitioned table") {
     withSQLConf("hoodie.sql.insert.mode" -> "non-strict") {
-      withRecordType()(withTempDir { tmp =>
-        Seq("cow", "mor").foreach {tableType =>
+      withTempDir { tmp =>
+        Seq("cow", "mor").foreach { tableType =>
           withTable(generateTableName) { tableName =>
             spark.sql(
               s"""
@@ -867,7 +865,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
             )
           }
         }
-      })
+      }
     }
   }
 
@@ -956,7 +954,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   test("Test bulk insert with CTAS") {
     withSQLConf("hoodie.sql.insert.mode" -> "non-strict",
       "hoodie.sql.bulk.insert.enable" -> "true") {
-      withRecordType()(withTempDir { tmp =>
+      withTempDir { tmp =>
         Seq("cow", "mor").foreach { tableType =>
           withTable(generateTableName) { inputTable =>
             spark.sql(
@@ -998,13 +996,13 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
             }
           }
         }
-      })
+      }
     }
   }
 
   test("Test bulk insert with empty dataset") {
     withSQLConf(SPARK_SQL_INSERT_INTO_OPERATION.key -> 
WriteOperationType.BULK_INSERT.value()) {
-      withRecordType()(withTempDir { tmp =>
+      withTempDir { tmp =>
         Seq("cow", "mor").foreach { tableType =>
           withTable(generateTableName) { inputTable =>
             spark.sql(
@@ -1042,7 +1040,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
             }
           }
         }
-      })
+      }
     }
   }
 
@@ -1054,7 +1052,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         Array()
       }
       withSQLConf(bulkInsertConf: _*) {
-        withRecordType()(withTempDir { tmp =>
+        withTempDir { tmp =>
           Seq("cow", "mor").foreach { tableType =>
             withTable(generateTableName) { inputTable =>
               spark.sql(
@@ -1098,14 +1096,14 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
               }
             }
           }
-        })
+        }
       }
     }
   }
 
   test("Test bulk insert with insert overwrite table") {
     withSQLConf(SPARK_SQL_INSERT_INTO_OPERATION.key -> 
WriteOperationType.BULK_INSERT.value()) {
-      withRecordType()(withTempDir { tmp =>
+      withTempDir { tmp =>
         Seq("cow", "mor").foreach { tableType =>
           withTable(generateTableName) { nonPartitionedTable =>
             spark.sql(
@@ -1132,13 +1130,13 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
             }
           }
         }
-      })
+      }
     }
   }
 
   test("Test bulk insert with insert overwrite partition") {
     withSQLConf(SPARK_SQL_INSERT_INTO_OPERATION.key -> 
WriteOperationType.BULK_INSERT.value()) {
-      withRecordType()(withTempDir { tmp =>
+      withTempDir { tmp =>
         Seq("cow", "mor").foreach { tableType =>
           withTable(generateTableName) { partitionedTable =>
             spark.sql(
@@ -1179,7 +1177,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
             }
           }
         }
-      })
+      }
     }
   }
 
@@ -1355,7 +1353,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
 
   test("Test Insert Into With Catalog Identifier for spark >= 3.2.0") {
     Seq("hudi", "parquet").foreach { format =>
-      withRecordType()(withTempDir { tmp =>
+      withTempDir { tmp =>
         val tableName = s"spark_catalog.default.$generateTableName"
         // Create a partitioned table
         if (HoodieSparkUtils.gteqSpark3_2) {
@@ -1392,7 +1390,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
             Seq(2, "a2", 10.0, 1000, "2021-01-05")
           )
         }
-      })
+      }
     }
   }
 
@@ -1663,7 +1661,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   test("Test Insert Overwrite Into Bucket Index Table") {
     withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") {
       Seq("mor", "cow").foreach { tableType =>
-        withRecordType()(withTempDir { tmp =>
+        withTempDir { tmp =>
           val tableName = generateTableName
           // Create a partitioned table
           spark.sql(
@@ -1708,14 +1706,14 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
           checkAnswer(s"select id, name, price, ts, dt from $tableName order 
by dt")(
             Seq(13, "a2", 12.0, 1000, "2021-01-05")
           )
-        })
+        }
       }
     }
   }
 
   test("Test Insert Overwrite Into Consistent Bucket Index Table") {
     withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") {
-      withRecordType()(withTempDir { tmp =>
+      withTempDir { tmp =>
         val tableName = generateTableName
         // Create a partitioned table
         spark.sql(
@@ -1768,13 +1766,13 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         checkAnswer(s"select id, name, price, ts, dt from $tableName order by 
dt")(
           Seq(13, "a3", 12.0, 1000, "2021-01-05")
         )
-      })
+      }
     }
   }
 
   test("Test Hudi should not record empty preCombineKey in hoodie.properties") 
{
     withSQLConf("hoodie.datasource.write.operation" -> "insert") {
-      withRecordType()(withTempDir { tmp =>
+      withTempDir { tmp =>
         val tableName = generateTableName
         spark.sql(
           s"""
@@ -1802,7 +1800,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
           Seq(2, "name2", 12.0),
           Seq(3, "name3", 13.0)
         )
-      })
+      }
     }
   }
 
@@ -2004,17 +2002,17 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
     spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
     spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         withTable(generateTableName) { tableName =>
           ingestAndValidateData(tableType, tableName, tmp, 
WriteOperationType.UPSERT)
         }
       }
-    })
+    }
   }
 
   test("Test sql write operation with INSERT_INTO override both strict mode 
and sql write operation") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         Seq(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT, 
WriteOperationType.UPSERT).foreach { operation =>
           withTable(generateTableName) { tableName =>
@@ -2023,11 +2021,11 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
           }
         }
       }
-    })
+    }
   }
 
   test("Test sql write operation with INSERT_INTO override only sql write 
operation") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         Seq(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT, 
WriteOperationType.UPSERT).foreach { operation =>
           withTable(generateTableName) { tableName =>
@@ -2036,7 +2034,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
           }
         }
       }
-    })
+    }
   }
 
   test("Test sql write operation with INSERT_INTO override only strict mode") {
@@ -2045,14 +2043,14 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     
spark.sessionState.conf.unsetConf(DataSourceWriteOptions.INSERT_DUP_POLICY.key())
     spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
     spark.sessionState.conf.unsetConf("hoodie.sql.bulk.insert.enable")
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         withTable(generateTableName) { tableName =>
           ingestAndValidateData(tableType, tableName, tmp, 
WriteOperationType.UPSERT,
             List("set hoodie.sql.insert.mode = upsert"))
         }
       }
-    })
+    }
   }
 
   def ingestAndValidateData(tableType: String, tableName: String, tmp: File,
@@ -2126,17 +2124,17 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode")
     spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
     spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
-    withRecordType()(withTempDir { tmp =>
-      Seq("cow","mor").foreach { tableType =>
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
         withTable(generateTableName) { tableName =>
           ingestAndValidateDataNoPrecombine(tableType, tableName, tmp, 
WriteOperationType.INSERT)
         }
       }
-    })
+    }
   }
 
   test("Test inaccurate index type") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       val targetTable = generateTableName
 
       assertThrows[IllegalArgumentException] {
@@ -2164,7 +2162,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
                |""".stripMargin)
         }
       }
-    })
+    }
   }
 
   test("Test vectorized read nested columns for 
LegacyHoodieParquetFileFormat") {
@@ -2270,7 +2268,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   }
 
   test("Test insert dup policy with INSERT_INTO explicit new configs INSERT 
operation ") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         val operation = WriteOperationType.INSERT
         Seq(NONE_INSERT_DUP_POLICY, DROP_INSERT_DUP_POLICY).foreach { 
dupPolicy =>
@@ -2282,11 +2280,11 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
           }
         }
       }
-    })
+    }
   }
 
   test("Test insert dup policy with INSERT_INTO explicit new configs 
BULK_INSERT operation ") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       Seq("cow").foreach { tableType =>
         val operation = WriteOperationType.BULK_INSERT
         val dupPolicy = NONE_INSERT_DUP_POLICY
@@ -2297,7 +2295,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
             dupPolicy)
         }
       }
-    })
+    }
   }
 
   test("Test DROP insert dup policy with INSERT_INTO explicit new configs BULK 
INSERT operation") {
@@ -2364,7 +2362,6 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     })
   }
 
-
   def ingestAndValidateDataDupPolicy(tableType: String, tableName: String, 
tmp: File,
                                      expectedOperationtype: WriteOperationType 
= WriteOperationType.INSERT,
                                      setOptions: List[String] = List.empty,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 90398f4689f..b56ca09ab96 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -130,7 +130,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
   test("Test MergeInto with more than once update actions for spark >= 3.1.x") 
{
 
     if (HoodieSparkUtils.gteqSpark3_1) {
-      withRecordType()(withTempDir { tmp =>
+      withTempDir { tmp =>
         val targetTable = generateTableName
         spark.sql(
           s"""
@@ -179,7 +179,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         checkAnswer(s"select id, name, data, country, ts from $targetTable")(
           Seq(1, "lb", 5, "shu", 1646643196L)
         )
-      })
+      }
     }
   }
 
@@ -469,7 +469,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
   }
 
   test("Test MergeInto for MOR table ") {
-    withRecordType()(withTempDir {tmp =>
+    withTempDir {tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       val tableName = generateTableName
       // Create a mor partitioned table.
@@ -597,11 +597,11 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
       checkAnswer(s"select id,name,price,dt from $tableName order by id")(
         Seq(1, "a1", 12, "2021-03-21")
       )
-    })
+    }
   }
 
   test("Test MergeInto with insert only") {
-    withRecordType()(withTempDir {tmp =>
+    withTempDir {tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       // Create a partitioned mor table
       val tableName = generateTableName
@@ -652,7 +652,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         Seq(1, "a1", 10, "2021-03-21"),
         Seq(2, "a2", 10, "2021-03-20")
       )
-    })
+    }
   }
 
   test("Test MergeInto For PreCombineField") {
@@ -730,7 +730,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
   }
 
   test("Test MergeInto with preCombine field expression") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       Seq("cow", "mor").foreach { tableType =>
         val tableName1 = generateTableName
@@ -808,11 +808,11 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
           Seq(1, "a1", 24, "2021-03-21", 1002)
         )
       }
-    })
+    }
   }
 
   test("Test MergeInto with primaryKey expression") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       val tableName1 = generateTableName
       spark.sql(
@@ -908,7 +908,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
       checkAnswer(s"select id,name,price,v,dt from $tableName1 order by id")(
         Seq(1, "a1", 10, 1000, "2021-03-21")
       )
-    })
+    }
   }
 
   test("Test MergeInto with combination of delete update insert") {
@@ -1082,7 +1082,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
   }
 
   test("Test Different Type of PreCombineField") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       val typeAndValue = Seq(
         ("string", "'1000'"),
@@ -1138,7 +1138,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
           Seq(1, "a1", 20.0)
         )
       }
-    })
+    }
   }
 
   test("Test MergeInto For MOR With Compaction On") {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index ef76cb72ca5..8ea7284e840 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -142,7 +142,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
   }
 
   test("Test Merge Into CTAS Table") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       val tableName = generateTableName
       spark.sql(
@@ -174,7 +174,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
       checkAnswer(s"select id, name from $tableName")(
         Seq(1, "a1_1")
       )
-    })
+    }
   }
 
   test("Test Merge With Complex Data Type") {
@@ -242,7 +242,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
   }
 
   test("Test column name matching for insert * and update set *") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       val tableName = generateTableName
       // Create table
@@ -326,11 +326,11 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
         Seq(3, "a3", 102.0, 1000, "2021-05-05"),
         Seq(4, "a4", 100.0, 1000, "2021-05-06")
       )
-    })
+    }
   }
 
   test("Test MergeInto For Source Table With Column Aliases") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       val tableName = generateTableName
       // Create table
@@ -370,7 +370,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
           Seq(1, "a1", 10.0, 1000)
         )
       }
-    })
+    }
   }
 
   /* TODO [HUDI-6472]
@@ -556,7 +556,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
 */
 
   test("Test only insert when source table contains history") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       val tableName = generateTableName
       // Create table
@@ -598,7 +598,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
         Seq(1, "a1", 1.0, 10, "2022-08-18"),
         Seq(2, "a2", 10.0, 100, "2022-08-18")
       )
-    })
+    }
   }
 
   test("Test only insert when source table contains history and target table 
has multiple keys") {
@@ -649,7 +649,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
   }
 
   test("Test Merge Into For Source Table With Different Column Order") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       val tableName = generateTableName
       // Create a mor partitioned table.
       spark.sql(
@@ -683,7 +683,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
       checkAnswer(s"select id,name,price,dt from $tableName")(
         Seq(1, "a1", 10, "2021-03-21")
       )
-    })
+    }
   }
 
   test("Test Merge into with String cast to Double") {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
index 419bb43de43..dae2dda4bfa 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
@@ -247,7 +247,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends 
HoodieSparkSqlTestBase wit
 
   test("Test pkless multiple source match") {
     for (withPrecombine <- Seq(true, false)) {
-      withRecordType()(withTempDir { tmp =>
+      withTempDir { tmp =>
         spark.sql("set hoodie.payload.combined.schema.validate = true")
         val tableName = generateTableName
 
@@ -292,13 +292,13 @@ class TestMergeIntoTableWithNonRecordKeyField extends 
HoodieSparkSqlTestBase wit
             Seq(1, "a1", 30.0, 100)
           )
         }
-      })
+      }
     }
 
   }
 
   test("Test MergeInto Basic pkless") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
       spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
       val tableName = generateTableName
@@ -385,6 +385,6 @@ class TestMergeIntoTableWithNonRecordKeyField extends 
HoodieSparkSqlTestBase wit
        """.stripMargin)
       val cnt = spark.sql(s"select * from $tableName where id = 1").count()
       assertResult(0)(cnt)
-    })
+    }
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 6a64c69021c..bfd14ae4c5a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -143,7 +143,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
   }
 
   test("Test alter column types 2") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         val tableName = generateTableName
         val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
@@ -176,7 +176,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           )
         }
       }
-    })
+    }
   }
 
   test("Test Enable and Disable Schema on read") {
@@ -232,7 +232,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
   }
 
   test("Test alter table properties and add rename drop column") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         val tableName = generateTableName
         val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
@@ -336,7 +336,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
         }
       }
       spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
-    })
+    }
   }
 
   test("Test Chinese table ") {
@@ -393,7 +393,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
 
 
   test("Test alter column by add rename and drop") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         val tableName = generateTableName
         val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
@@ -453,7 +453,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           validateInternalSchema(tablePath, isDropColumn = false, 
currentMaxColumnId = maxColumnId)
         }
       }
-    })
+    }
   }
 
   private def validateInternalSchema(basePath: String, isDropColumn: Boolean, 
currentMaxColumnId: Int): Unit = {
@@ -543,7 +543,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
   }
 
   test("Test alter column with complex schema") {
-    withRecordType()(withTempDir { tmp =>
+    withTempDir { tmp =>
       withSQLConf(s"$SPARK_SQL_INSERT_INTO_OPERATION" -> "upsert",
         "hoodie.schema.on.read.enable" -> "true",
         "spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") {
@@ -628,7 +628,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           )
         }
       }
-    })
+    }
   }
 
   test("Test schema auto evolution complex") {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala
index 73bad3be282..e6275d22e62 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala
@@ -69,7 +69,7 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
 
   test("Test Insert Into Records with time travel To new Table") {
     if (HoodieSparkUtils.gteqSpark3_2) {
-      withRecordType()(withTempDir { tmp =>
+      withTempDir { tmp =>
         // Create Non-Partitioned table
         val tableName1 = generateTableName
         spark.sql(
@@ -138,7 +138,7 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
           Seq(1, "a1", 10.0, 1000, "2022-02-14"),
           Seq(2, "a2", 10.0, 1000, "2022-02-15")
         )
-      })
+      }
     }
   }
 
@@ -238,18 +238,18 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
   test("Test Unsupported syntax can be parsed") {
     if (HoodieSparkUtils.gteqSpark3_2) {
       checkAnswer("select 1 distribute by 1")(Seq(1))
-      withRecordType()(withTempDir { dir =>
+      withTempDir { dir =>
         val path = dir.toURI.getPath
         spark.sql(s"insert overwrite local directory '$path' using parquet 
select 1")
         // Requires enable hive support, so didn't test it
         // spark.sql(s"insert overwrite local directory '$path' stored as orc 
select 1")
-      })
+      }
     }
   }
 
   test("Test Select Record with time travel and Repartition") {
     if (HoodieSparkUtils.gteqSpark3_2) {
-      withRecordType()(withTempDir { tmp =>
+      withTempDir { tmp =>
         val tableName = generateTableName
         spark.sql(
           s"""
@@ -289,7 +289,7 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
           s"select id, name, price, ts from $tableName TIMESTAMP AS OF 
'$instant1' distribute by cast(rand() * 2 as int)")(
           Seq(1, "a1", 10.0, 1000)
         )
-      })
+      }
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
index 0c2c34ae6d9..7c7fc70d3f3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
@@ -233,8 +233,8 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
   }
 
   test("Test ignoring case for Update Table") {
-    withRecordType()(withTempDir { tmp =>
-      Seq("cow", "mor").foreach {tableType =>
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
         val tableName = generateTableName
         // create table
         spark.sql(
@@ -270,7 +270,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
           Seq(1, "a1", 40.0, 1000)
         )
       }
-    })
+    }
   }
 
   test("Test decimal type") {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 502baf34ff4..263389af698 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -644,12 +644,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", 
recordType);
   }
 
-  @ParameterizedTest
-  @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
-  public void testUpsertsCOW_ContinuousModeDisabled(HoodieRecordType 
recordType) throws Exception {
+  @Test
+  public void testUpsertsCOW_ContinuousModeDisabled() throws Exception {
     String tableBasePath = basePath + "/non_continuous_cow";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
-    addRecordMerger(recordType, cfg.configs);
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true"));
     cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), 
MetricsReporterType.INMEMORY.name()));
@@ -675,12 +673,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor", 
recordType);
   }
 
-  @ParameterizedTest
-  @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
-  public void testUpsertsMOR_ContinuousModeDisabled(HoodieRecordType 
recordType) throws Exception {
+  @Test
+  public void testUpsertsMOR_ContinuousModeDisabled() throws Exception {
     String tableBasePath = basePath + "/non_continuous_mor";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
-    addRecordMerger(recordType, cfg.configs);
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
     cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true"));
     cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), 
MetricsReporterType.INMEMORY.name()));
@@ -878,8 +874,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   @ParameterizedTest
-  @CsvSource(value = {"true, AVRO", "true, SPARK", "false, AVRO", "false, 
SPARK"})
-  public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean, 
HoodieRecordType recordType) throws Exception {
+  @ValueSource(booleans = {true, false})
+  public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) 
throws Exception {
     String tableBasePath = basePath + "/cleanerDeleteReplacedDataWithArchive" 
+ asyncClean;
 
     int totalRecords = 3000;
@@ -887,7 +883,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // Step 1 : Prepare and insert data without archival and cleaner.
     // Make sure that there are 6 commits including 2 replacecommits completed.
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
-    addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(HoodieRecordType.AVRO, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "true", 
"2", "", ""));
@@ -956,7 +952,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       configs.add(String.format("%s=%s", 
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
           InProcessLockProvider.class.getName()));
     }
-    addRecordMerger(recordType, configs);
+    addRecordMerger(HoodieRecordType.AVRO, configs);
     cfg.configs = configs;
     cfg.continuousMode = false;
     // timeline as of now. no cleaner and archival kicked in.
@@ -1188,16 +1184,15 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   @Timeout(600)
-  @ParameterizedTest
-  @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
-  public void testAsyncClusteringServiceWithCompaction(HoodieRecordType 
recordType) throws Exception {
+  @Test
+  public void testAsyncClusteringServiceWithCompaction() throws Exception {
     String tableBasePath = basePath + "/asyncClusteringCompaction";
     // Keep it higher than batch-size to test continuous mode
     int totalRecords = 2000;
 
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
-    addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(HoodieRecordType.AVRO, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
     cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "", "", 
"true", "3"));
@@ -1216,14 +1211,14 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   @ParameterizedTest
-  @CsvSource(value = {"true, AVRO", "true, SPARK", "false, AVRO", "false, 
SPARK"})
-  public void testAsyncClusteringJobWithRetry(boolean 
retryLastFailedClusteringJob, HoodieRecordType recordType) throws Exception {
+  @ValueSource(booleans = {true, false})
+  public void testAsyncClusteringJobWithRetry(boolean 
retryLastFailedClusteringJob) throws Exception {
     String tableBasePath = basePath + "/asyncClustering3";
 
     // ingest data
     int totalRecords = 3000;
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
-    addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(HoodieRecordType.AVRO, cfg.configs);
     cfg.continuousMode = false;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "false", 
"0", "false", "0"));
@@ -1251,7 +1246,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // trigger a scheduleAndExecute clustering job
     // when retryFailedClustering true => will rollback and re-execute failed 
clustering plan with same instant timestamp.
     // when retryFailedClustering false => will make and execute a new 
clustering plan with new instant timestamp.
-    HoodieClusteringJob scheduleAndExecute = 
initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute", 
retryLastFailedClusteringJob, recordType);
+    HoodieClusteringJob scheduleAndExecute = 
initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute", 
retryLastFailedClusteringJob, HoodieRecordType.AVRO);
     scheduleAndExecute.cluster(0);
 
     String completeClusteringTimeStamp = 
meta.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
@@ -1265,11 +1260,11 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   @ParameterizedTest
-  @CsvSource(value = {"execute, AVRO", "schedule, AVRO", "scheduleAndExecute, 
AVRO", "execute, SPARK", "schedule, SPARK", "scheduleAndExecute, SPARK"})
-  public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String 
runningMode, HoodieRecordType recordType) throws Exception {
+  @ValueSource(strings = {"execute", "schedule", "scheduleAndExecute"})
+  public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String 
runningMode) throws Exception {
     String tableBasePath = basePath + "/asyncClustering2";
-    HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, 
"false", recordType, WriteOperationType.BULK_INSERT);
-    HoodieClusteringJob scheduleClusteringJob = 
initialHoodieClusteringJob(tableBasePath, null, true, runningMode, recordType);
+    HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, 
"false", HoodieRecordType.AVRO, WriteOperationType.BULK_INSERT);
+    HoodieClusteringJob scheduleClusteringJob = 
initialHoodieClusteringJob(tableBasePath, null, true, runningMode, 
HoodieRecordType.AVRO);
 
     deltaStreamerTestRunner(ds, (r) -> {
       Exception exception = null;
@@ -1475,9 +1470,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
    * step involves using a SQL template to transform a source TEST-DATA-SOURCE 
============================> HUDI TABLE
    * 1 ===============> HUDI TABLE 2 (incr-pull with transform) (incr-pull) 
Hudi Table 1 is synced with Hive.
    */
-  @ParameterizedTest
-  @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
-  public void 
testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline(HoodieRecordType
 recordType) throws Exception {
+  @Test
+  public void 
testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() throws 
Exception {
+    HoodieRecordType recordType = HoodieRecordType.AVRO;
     String tableBasePath = basePath + "/" + recordType.toString() + 
"/test_table2";
     String downstreamTableBasePath = basePath + "/" + recordType.toString() + 
"/test_downstream_table2";
 
@@ -1648,14 +1643,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertFalse(props.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
   }
 
-  @ParameterizedTest
-  @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
-  public void testFilterDupes(HoodieRecordType recordType) throws Exception {
+  @Test
+  public void testFilterDupes() throws Exception {
     String tableBasePath = basePath + "/test_dupes_table";
 
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
-    addRecordMerger(recordType, cfg.configs);
     new HoodieDeltaStreamer(cfg, jsc).sync();
     assertRecordCount(1000, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
@@ -1676,7 +1669,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieTableMetaClient mClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
     HoodieInstant lastFinished = 
mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
     HoodieDeltaStreamer.Config cfg2 = 
TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
-    addRecordMerger(recordType, cfg2.configs);
+    addRecordMerger(HoodieRecordType.AVRO, cfg2.configs);
     cfg2.filterDupes = false;
     cfg2.sourceLimit = 2000;
     cfg2.operation = WriteOperationType.UPSERT;
@@ -2838,9 +2831,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   @ParameterizedTest
-  @CsvSource(value = {"COPY_ON_WRITE, AVRO", "MERGE_ON_READ, AVRO",
-      "COPY_ON_WRITE, SPARK", "MERGE_ON_READ, SPARK"})
-  public void testConfigurationHotUpdate(HoodieTableType tableType, 
HoodieRecordType recordType) throws Exception {
+  @EnumSource(HoodieTableType.class)
+  public void testConfigurationHotUpdate(HoodieTableType tableType) throws 
Exception {
+    HoodieRecordType recordType = HoodieRecordType.AVRO;
     String tableBasePath = basePath + 
String.format("/configurationHotUpdate_%s_%s", tableType.name(), 
recordType.name());
 
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);

Reply via email to