This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.14.1-spark35-scala213 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 8b62b63bbca33255e3ceb87b49b680f06af7d750 Author: vinoth chandar <[email protected]> AuthorDate: Tue Jan 23 10:24:29 2024 +0530 [MINOR] Reduce UT spark-datasource test times (#10547) * [MINOR] Reduce UT spark-datasource test times * Reverting the parallelism change --- .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 51 +++++++------- .../apache/hudi/functional/TestCOWDataSource.scala | 23 ++++--- .../functional/TestDataSourceForBootstrap.scala | 35 ++++------ .../hudi/functional/TestSparkDataSource.scala | 80 +++++++--------------- 4 files changed, 75 insertions(+), 114 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 38221cc05c7..c4f2680f4ed 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -702,15 +702,11 @@ def testBulkInsertForDropPartitionColumn(): Unit = { */ @ParameterizedTest @CsvSource(value = Array( - "COPY_ON_WRITE,true", - "COPY_ON_WRITE,false", - "MERGE_ON_READ,true", - "MERGE_ON_READ,false" + "COPY_ON_WRITE", + "MERGE_ON_READ" )) - def testSchemaEvolutionForTableType(tableType: String, allowColumnDrop: Boolean): Unit = { - val opts = getCommonParams(tempPath, hoodieFooTableName, tableType) ++ Map( - HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> allowColumnDrop.toString - ) + def testSchemaEvolutionForTableType(tableType: String): Unit = { + val opts = getCommonParams(tempPath, hoodieFooTableName, tableType) // Create new table // NOTE: We disable Schema Reconciliation by default (such that Writer's @@ -801,28 +797,30 @@ def testBulkInsertForDropPartitionColumn(): Unit = { val df5 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) - if (allowColumnDrop) { - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, noReconciliationOpts, df5) - - val snapshotDF5 = spark.read.format("org.apache.hudi") - .load(tempBasePath + "/*/*/*/*") - - assertEquals(35, snapshotDF5.count()) + // assert error is thrown when dropping is not allowed + val disallowOpts = noReconciliationOpts ++ Map( + HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> false.toString + ) + assertThrows[SchemaCompatibilityException] { + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, disallowOpts, df5) + } - assertEquals(df5.intersect(dropMetaFields(snapshotDF5)).except(df5).count, 0) + // passes when allowed. + val allowOpts = noReconciliationOpts ++ Map( + HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> true.toString + ) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, allowOpts, df5) - val fifthBatchActualSchema = fetchActualSchema() - val fifthBatchExpectedSchema = { - val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieFooTableName) - AvroConversionUtils.convertStructTypeToAvroSchema(df5.schema, structName, nameSpace) - } + val snapshotDF5 = spark.read.format("org.apache.hudi").load(tempBasePath + "/*/*/*/*") + assertEquals(35, snapshotDF5.count()) + assertEquals(df5.intersect(dropMetaFields(snapshotDF5)).except(df5).count, 0) - assertEquals(fifthBatchExpectedSchema, fifthBatchActualSchema) - } else { - assertThrows[SchemaCompatibilityException] { - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, noReconciliationOpts, df5) - } + val fifthBatchActualSchema = fetchActualSchema() + val fifthBatchExpectedSchema = { + val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieFooTableName) + AvroConversionUtils.convertStructTypeToAvroSchema(df5.schema, structName, nameSpace) } + assertEquals(fifthBatchExpectedSchema, fifthBatchActualSchema) } /** @@ -1418,7 +1416,6 @@ object TestHoodieSparkSqlWriter { def deletePartitionsWildcardTestParams(): java.util.stream.Stream[Arguments] = { java.util.stream.Stream.of( - arguments("2015/03/*", Seq("2016/03/15")), arguments("*5/03/1*", Seq("2016/03/15")), arguments("2016/03/*", Seq("2015/03/16", "2015/03/17"))) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index f500ea83120..b6b881c2b70 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -658,7 +658,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup val countDownLatch = new CountDownLatch(2) for (x <- 1 to 2) { val thread = new Thread(new UpdateThread(dataGen, spark, commonOpts, basePath, x + "00", countDownLatch, numRetries)) - thread.setName((x + "00_THREAD").toString()) + thread.setName(x + "00_THREAD") thread.start() } countDownLatch.await(1, TimeUnit.MINUTES) @@ -682,15 +682,18 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup val insertRecs = recordsToStrings(dataGen.generateInserts(instantTime, 1000)).toList val updateDf = spark.read.json(spark.sparkContext.parallelize(updateRecs, 2)) val insertDf = spark.read.json(spark.sparkContext.parallelize(insertRecs, 2)) - updateDf.union(insertDf).write.format("org.apache.hudi") - .options(commonOpts) - .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control") - .option("hoodie.cleaner.policy.failed.writes", "LAZY") - .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider") - .option(HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.key(), numRetries.toString) - .mode(SaveMode.Append) - .save(basePath) - countDownLatch.countDown() + try { + updateDf.union(insertDf).write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control") + .option("hoodie.cleaner.policy.failed.writes", "LAZY") + .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider") + .option(HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.key(), numRetries.toString) + .mode(SaveMode.Append) + .save(basePath) + } finally { + countDownLatch.countDown() + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala index 9949b396abf..c8445fefd07 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala @@ -171,8 +171,8 @@ class TestDataSourceForBootstrap { @CsvSource(value = Array( "org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector,AVRO", // TODO(HUDI-5807) enable for spark native records - /* "org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,SPARK", */ - "org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,AVRO", + /* "org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,SPARK", + "org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,AVRO",*/ "org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,SPARK" )) def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapSelector: String, recordType: HoodieRecordType): Unit = { @@ -252,11 +252,8 @@ class TestDataSourceForBootstrap { verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true) } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], - // TODO(HUDI-5807) enable for spark native records - names = Array("AVRO" /*, "SPARK" */)) - def testMetadataBootstrapCOWPartitioned(recordType: HoodieRecordType): Unit = { + @Test + def testMetadataBootstrapCOWPartitioned(): Unit = { val timestamp = Instant.now.toEpochMilli val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) @@ -268,7 +265,7 @@ class TestDataSourceForBootstrap { .mode(SaveMode.Overwrite) .save(srcPath) - val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map( + val writeOpts = commonOpts ++ getRecordTypeOpts(HoodieRecordType.AVRO) ++ Map( DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr" ) @@ -331,9 +328,8 @@ class TestDataSourceForBootstrap { verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, isPartitioned = true, isHiveStylePartitioned = true) } - @ParameterizedTest - @ValueSource(booleans = Array(true, false)) - def testMetadataBootstrapMORPartitionedInlineClustering(enableRowWriter: Boolean): Unit = { + @Test + def testMetadataBootstrapMORPartitionedInlineClustering(): Unit = { val timestamp = Instant.now.toEpochMilli val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) // Prepare source data @@ -343,7 +339,7 @@ class TestDataSourceForBootstrap { .mode(SaveMode.Overwrite) .save(srcPath) - val writeOpts = commonOpts ++ getRecordTypeOpts(HoodieRecordType.AVRO) ++ Map( + val writeOpts = commonOpts ++ Map( DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr" ) @@ -370,7 +366,6 @@ class TestDataSourceForBootstrap { .options(writeOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, enableRowWriter.toString) .option(HoodieClusteringConfig.INLINE_CLUSTERING.key, "true") .option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key, "1") .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "datestr") @@ -464,9 +459,8 @@ class TestDataSourceForBootstrap { assertEquals(numRecordsUpdate, hoodieROViewDFWithBasePath.filter(s"timestamp == $updateTimestamp").count()) } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testMetadataBootstrapMORPartitioned(recordType: HoodieRecordType): Unit = { + @Test + def testMetadataBootstrapMORPartitioned(): Unit = { val timestamp = Instant.now.toEpochMilli val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) @@ -478,7 +472,7 @@ class TestDataSourceForBootstrap { .mode(SaveMode.Overwrite) .save(srcPath) - val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map( + val writeOpts = commonOpts ++ Map( DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr" ) @@ -550,9 +544,8 @@ class TestDataSourceForBootstrap { assertEquals(0, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count()) } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testFullBootstrapCOWPartitioned(recordType: HoodieRecordType): Unit = { + @Test + def testFullBootstrapCOWPartitioned(): Unit = { val timestamp = Instant.now.toEpochMilli val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) @@ -564,7 +557,7 @@ class TestDataSourceForBootstrap { .mode(SaveMode.Overwrite) .save(srcPath) - val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map( + val writeOpts = commonOpts ++ Map( DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr" ) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala index 3f64e24dfc9..7b93f98b97c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala @@ -51,26 +51,16 @@ class TestSparkDataSource extends SparkClientFunctionalTestHarness { @ParameterizedTest @CsvSource(value = Array( - "COPY_ON_WRITE|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM" + "COPY_ON_WRITE|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "COPY_ON_WRITE|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "COPY_ON_WRITE|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "MERGE_ON_READ|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "MERGE_ON_READ|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM" ), delimiter = '|') - def testCoreFlow(tableType: String, isMetadataEnabledOnWrite: Boolean, isMetadataEnabledOnRead: Boolean, keyGenClass: String, indexType: String): Unit = { + def testCoreFlow(tableType: String, keyGenClass: String, indexType: String): Unit = { + val isMetadataEnabledOnWrite = true + val isMetadataEnabledOnRead = true val partitionField = if (classOf[NonpartitionedKeyGenerator].getName.equals(keyGenClass)) "" else "partition" val options: Map[String, String] = commonOpts + (HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabledOnWrite)) + @@ -216,44 +206,22 @@ class TestSparkDataSource extends SparkClientFunctionalTestHarness { @ParameterizedTest @CsvSource(value = Array( - "COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", - "MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", - "MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM" + "COPY_ON_WRITE|insert|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "COPY_ON_WRITE|insert|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "COPY_ON_WRITE|insert|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|insert|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "MERGE_ON_READ|insert|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "MERGE_ON_READ|insert|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "COPY_ON_WRITE|bulk_insert|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "COPY_ON_WRITE|bulk_insert|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "COPY_ON_WRITE|bulk_insert|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM", + "MERGE_ON_READ|bulk_insert|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM", + "MERGE_ON_READ|bulk_insert|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE", + "MERGE_ON_READ|bulk_insert|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM" ), delimiter = '|') - def testImmutableUserFlow(tableType: String, operation: String, isMetadataEnabledOnWrite: Boolean, isMetadataEnabledOnRead: Boolean, keyGenClass: String, indexType: String): Unit = { + def testImmutableUserFlow(tableType: String, operation: String, keyGenClass: String, indexType: String): Unit = { + val isMetadataEnabledOnWrite = true + val isMetadataEnabledOnRead = true val partitionField = if (classOf[NonpartitionedKeyGenerator].getName.equals(keyGenClass)) "" else "partition" val options: Map[String, String] = commonOpts + (HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabledOnWrite)) +
