This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 53725e8c91c2a904bf00d7a4e97556b7adca068c Author: Y Ethan Guo <[email protected]> AuthorDate: Sat Mar 2 21:56:54 2024 -0800 [HUDI-7469] Reduce redundant tests with Hudi record types (#10800) --- .../apache/hudi/functional/TestCOWDataSource.scala | 72 +++--- .../apache/hudi/functional/TestMORDataSource.scala | 20 +- .../sql/hudi/TestAlterTableDropPartition.scala | 4 +- .../spark/sql/hudi/TestCompactionTable.scala | 4 +- .../apache/spark/sql/hudi/TestInsertTable.scala | 257 ++++++++++----------- .../apache/spark/sql/hudi/TestMergeIntoTable.scala | 24 +- .../spark/sql/hudi/TestMergeIntoTable2.scala | 20 +- .../TestMergeIntoTableWithNonRecordKeyField.scala | 8 +- .../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 16 +- .../spark/sql/hudi/TestTimeTravelTable.scala | 12 +- .../apache/spark/sql/hudi/TestUpdateTable.scala | 6 +- .../deltastreamer/TestHoodieDeltaStreamer.java | 63 +++-- 12 files changed, 242 insertions(+), 264 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index a28a228fd46..5614b414927 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -23,8 +23,9 @@ import org.apache.hudi.DataSourceWriteOptions.{INLINE_CLUSTERING_ENABLE, KEYGENE import org.apache.hudi.HoodieConversionUtils.toJavaOption import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs} import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD} import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD} +import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType} import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline, TimelineUtils} @@ -44,7 +45,6 @@ import org.apache.hudi.metrics.{Metrics, MetricsReporterType} import org.apache.hudi.testutils.HoodieSparkClientTestBase import org.apache.hudi.util.JFunction import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, QuickstartUtils, ScalaAssertionSupport} -import org.apache.hudi.common.fs.FSUtils import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.hudi.HoodieSparkSessionExtension @@ -96,10 +96,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup System.gc() } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testShortNameStorage(recordType: HoodieRecordType) { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) + @Test + def testShortNameStorage(): Unit = { + val (writeOpts, readOpts) = getWriterReaderOpts() // Insert Operation val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList @@ -564,10 +563,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup * archival should kick in and 2 commits should be archived. If schema is valid, no exception will be thrown. If not, * NPE will be thrown. */ - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testArchivalWithBulkInsert(recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) + @Test + def testArchivalWithBulkInsert(): Unit = { + val (writeOpts, readOpts) = getWriterReaderOpts() var structType: StructType = null for (i <- 1 to 7) { @@ -696,10 +694,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup } } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testOverWriteModeUseReplaceAction(recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) + @Test + def testOverWriteModeUseReplaceAction(): Unit = { + val (writeOpts, readOpts) = getWriterReaderOpts() val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi") @@ -774,10 +771,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(expectedCount, hudiReadPathDF.count()) } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testOverWriteTableModeUseReplaceAction(recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) + @Test + def testOverWriteTableModeUseReplaceAction(): Unit = { + val (writeOpts, readOpts) = getWriterReaderOpts() val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) @@ -804,10 +800,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals("replacecommit", commits(1)) } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testOverWriteModeUseReplaceActionOnDisJointPartitions(recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) + @Test + def testOverWriteModeUseReplaceActionOnDisJointPartitions(): Unit = { + val (writeOpts, readOpts) = getWriterReaderOpts() // step1: Write 5 records to hoodie table for partition1 DEFAULT_FIRST_PARTITION_PATH val records1 = recordsToStrings(dataGen.generateInsertsForPartition("001", 5, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList @@ -864,10 +859,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals("replacecommit", commits(2)) } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testOverWriteTableModeUseReplaceActionOnDisJointPartitions(recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) + @Test + def testOverWriteTableModeUseReplaceActionOnDisJointPartitions(): Unit = { + val (writeOpts, readOpts) = getWriterReaderOpts() // step1: Write 5 records to hoodie table for partition1 DEFAULT_FIRST_PARTITION_PATH val records1 = recordsToStrings(dataGen.generateInsertsForPartition("001", 5, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList @@ -1003,10 +997,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup }) } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testWithAutoCommitOn(recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) + @Test + def testWithAutoCommitOn(): Unit = { + val (writeOpts, readOpts) = getWriterReaderOpts() val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) @@ -1318,8 +1311,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup @ParameterizedTest @CsvSource(Array( - "true,false,AVRO", "true,true,AVRO", "false,true,AVRO", "false,false,AVRO", - "true,false,SPARK", "true,true,SPARK", "false,true,SPARK", "false,false,SPARK" + "true,false,AVRO", "true,true,AVRO", "false,true,AVRO", "false,false,AVRO" )) def testQueryCOWWithBasePathAndFileIndex(partitionEncode: Boolean, isMetadataEnabled: Boolean, recordType: HoodieRecordType): Unit = { testPartitionPruning( @@ -1516,11 +1508,10 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup } } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testSaveAsTableInDifferentModes(recordType: HoodieRecordType): Unit = { + @Test + def testSaveAsTableInDifferentModes(): Unit = { val options = scala.collection.mutable.Map.empty ++ commonOpts ++ Map("path" -> basePath) - val (writeOpts, readOpts) = getWriterReaderOpts(recordType, options.toMap) + val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO, options.toMap) // first use the Overwrite mode val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList @@ -1583,10 +1574,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(spark.read.format("hudi").options(readOpts).load(basePath).count(), 9) } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testMetricsReporterViaDataSource(recordType: HoodieRecordType): Unit = { - val (writeOpts, _) = getWriterReaderOpts(recordType, getQuickstartWriteConfigs.asScala.toMap) + @Test + def testMetricsReporterViaDataSource(): Unit = { + val (writeOpts, _) = getWriterReaderOpts(HoodieRecordType.AVRO, getQuickstartWriteConfigs.asScala.toMap) val dataGenerator = new QuickstartUtils.DataGenerator() val records = convertToStringList(dataGenerator.generateInserts(10)) @@ -1680,7 +1670,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup }) } - def getWriterReaderOpts(recordType: HoodieRecordType, + def getWriterReaderOpts(recordType: HoodieRecordType = HoodieRecordType.AVRO, opt: Map[String, String] = commonOpts, enableFileIndex: Boolean = DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()): (Map[String, String], Map[String, String]) = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index b1d3a17004b..45bd3c645d4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -27,7 +27,7 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.model._ import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.testutils.HoodieTestDataGenerator -import org.apache.hudi.common.testutils.RawTripTestPayload.{recordToString, recordsToStrings} +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.util import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable @@ -44,7 +44,7 @@ import org.apache.spark.sql.types.BooleanType import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{CsvSource, EnumSource} +import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource} import org.slf4j.LoggerFactory import java.util.function.Consumer @@ -948,10 +948,9 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin assertEquals(20, spark.read.format("hudi").options(readOpts).load(basePath).count()) } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testTempFilesCleanForClustering(recordType: HoodieRecordType): Unit = { - val (writeOpts, readOpts) = getWriterReaderOpts(recordType) + @Test + def testTempFilesCleanForClustering(): Unit = { + val (writeOpts, readOpts) = getWriterReaderOpts() val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).asScala val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) @@ -1230,9 +1229,8 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin * The read-optimized query should read `fg1_dc1.parquet` only in this case. */ @ParameterizedTest - @CsvSource(Array("true,AVRO", "true,SPARK", "false,AVRO", "false,SPARK")) - def testReadOptimizedQueryAfterInflightCompactionAndCompletedDeltaCommit(enableFileIndex: Boolean, - recordType: HoodieRecordType): Unit = { + @ValueSource(booleans = Array(true, false)) + def testReadOptimizedQueryAfterInflightCompactionAndCompletedDeltaCommit(enableFileIndex: Boolean): Unit = { val (tableName, tablePath) = ("hoodie_mor_ro_read_test_table", s"${basePath}_mor_test_table") val precombineField = "col3" val recordKeyField = "key" @@ -1250,7 +1248,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin "hoodie.upsert.shuffle.parallelism" -> "1") val pathForROQuery = getPathForROQuery(tablePath, !enableFileIndex, 0) - val (writeOpts, readOpts) = getWriterReaderOpts(recordType, options, enableFileIndex) + val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO, options, enableFileIndex) // First batch with all inserts // Deltacommit1 (DC1, completed), writing file group 1 (fg1) @@ -1383,7 +1381,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin assertEquals(inputRows, readRows) } - def getWriterReaderOpts(recordType: HoodieRecordType, + def getWriterReaderOpts(recordType: HoodieRecordType = HoodieRecordType.AVRO, opt: Map[String, String] = commonOpts, enableFileIndex: Boolean = DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()): (Map[String, String], Map[String, String]) = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index 2c592f5a815..7a146591f4e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -621,7 +621,7 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { } test("Test drop partition with wildcards") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow", "mor").foreach { tableType => val tableName = generateTableName spark.sql( @@ -653,6 +653,6 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { Seq("2023-09-01") ) } - }) + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala index 568e3569725..5ded75dcdab 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala @@ -75,7 +75,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase { } test("Test compaction path") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => val tableName = generateTableName spark.sql( s""" @@ -132,6 +132,6 @@ class TestCompactionTable extends HoodieSparkSqlTestBase { checkException(s"run compaction on '${tmp.getCanonicalPath}' at 12345")( s"specific 12345 instants is not exist" ) - }) + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 602881b6d2d..38f2e4e428c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -38,7 +38,7 @@ import java.io.File class TestInsertTable extends HoodieSparkSqlTestBase { test("Test table type name incase-sensitive test") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => val targetTable = generateTableName val tablePath = s"${tmp.getCanonicalPath}/$targetTable" @@ -86,7 +86,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { Seq("1", "aa", 123, "2024-02-19", 10), Seq("2", "bb", 456, "2024-02-19", 10) ) - }) + } } test("Test Insert Into with values") { @@ -125,7 +125,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } test("Test Insert Into with static partition") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => val tableName = generateTableName // Create a partitioned table spark.sql( @@ -174,11 +174,11 @@ class TestInsertTable extends HoodieSparkSqlTestBase { Seq(2, "a2", 20.0, 2000, "2021-01-06"), Seq(3, "a3", 30.0, 3000, "2021-01-07") ) - }) + } } test("Test Insert Into with dynamic partition") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => val tableName = generateTableName // Create a partitioned table spark.sql( @@ -228,7 +228,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { Seq(2, "a2", 20.0, 2000, "2021-01-06"), Seq(3, "a3", 30.0, 3000, "2021-01-07") ) - }) + } } test("Test Insert Into with multi partition") { @@ -409,7 +409,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } test("Test Insert Overwrite") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow", "mor").foreach { tableType => withTable(generateTableName) { tableName => // Create a partitioned table @@ -554,7 +554,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { ) } } - }) + } } test("Test insert overwrite for multi partitioned table") { @@ -656,19 +656,19 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } test("Test Different Type of Partition Column") { - withRecordType()(withTempDir { tmp => - val typeAndValue = Seq( - ("string", "'1000'"), - ("int", 1000), - ("bigint", 10000), - ("timestamp", "TIMESTAMP'2021-05-20 00:00:00'"), - ("date", "DATE'2021-05-20'") - ) - typeAndValue.foreach { case (partitionType, partitionValue) => - val tableName = generateTableName - validateDifferentTypesOfPartitionColumn(tmp, partitionType, partitionValue, tableName) - } - }) + withTempDir { tmp => + val typeAndValue = Seq( + ("string", "'1000'"), + ("int", 1000), + ("bigint", 10000), + ("timestamp", "TIMESTAMP'2021-05-20 00:00:00'"), + ("date", "DATE'2021-05-20'") + ) + typeAndValue.foreach { case (partitionType, partitionValue) => + val tableName = generateTableName + validateDifferentTypesOfPartitionColumn(tmp, partitionType, partitionValue, tableName) + } + } } test("Test TimestampType Partition Column With Consistent Logical Timestamp Enabled") { @@ -686,7 +686,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } test("Test insert for uppercase table name") { - withRecordType()(withTempDir{ tmp => + withTempDir { tmp => val tableName = s"H_$generateTableName" if (HoodieSparkUtils.gteqSpark3_5) { // [SPARK-44284] Spark 3.5+ requires conf below to be case sensitive @@ -713,84 +713,82 @@ class TestInsertTable extends HoodieSparkSqlTestBase { .setConf(spark.sessionState.newHadoopConf()) .build() assertResult(tableName)(metaClient.getTableConfig.getTableName) - }) + } } test("Test Insert Exception") { - withRecordType() { - val tableName = generateTableName + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | dt string + |) using hudi + | tblproperties (primaryKey = 'id') + | partitioned by (dt) + """.stripMargin) + val tooManyDataColumnsErrorMsg = if (HoodieSparkUtils.gteqSpark3_5) { + s""" + |[INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS] Cannot write to `spark_catalog`.`default`.`$tableName`, the reason is too many data columns: + |Table columns: `id`, `name`, `price`. + |Data columns: `1`, `a1`, `10`, `2021-06-20`. + |""".stripMargin + } else if (HoodieSparkUtils.gteqSpark3_4) { + """ + |too many data columns: + |Table columns: 'id', 'name', 'price'. + |Data columns: '1', 'a1', '10', '2021-06-20'. + |""".stripMargin + } else { + """ + |too many data columns: + |Table columns: 'id', 'name', 'price' + |Data columns: '1', 'a1', '10', '2021-06-20' + |""".stripMargin + } + checkExceptionContain(s"insert into $tableName partition(dt = '2021-06-20') select 1, 'a1', 10, '2021-06-20'")( + tooManyDataColumnsErrorMsg) + + val notEnoughDataColumnsErrorMsg = if (HoodieSparkUtils.gteqSpark3_5) { + s""" + |[INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS] Cannot write to `spark_catalog`.`default`.`$tableName`, the reason is not enough data columns: + |Table columns: `id`, `name`, `price`, `dt`. + |Data columns: `1`, `a1`, `10`. + |""".stripMargin + } else if (HoodieSparkUtils.gteqSpark3_4) { + """ + |not enough data columns: + |Table columns: 'id', 'name', 'price', 'dt'. + |Data columns: '1', 'a1', '10'. + |""".stripMargin + } else { + """ + |not enough data columns: + |Table columns: 'id', 'name', 'price', 'dt' + |Data columns: '1', 'a1', '10' + |""".stripMargin + } + checkExceptionContain(s"insert into $tableName select 1, 'a1', 10")(notEnoughDataColumnsErrorMsg) + withSQLConf("hoodie.sql.bulk.insert.enable" -> "true", "hoodie.sql.insert.mode" -> "strict") { + val tableName2 = generateTableName spark.sql( s""" - |create table $tableName ( + |create table $tableName2 ( | id int, | name string, | price double, - | dt string + | ts long |) using hudi - | tblproperties (primaryKey = 'id') - | partitioned by (dt) - """.stripMargin) - val tooManyDataColumnsErrorMsg = if (HoodieSparkUtils.gteqSpark3_5) { - s""" - |[INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS] Cannot write to `spark_catalog`.`default`.`$tableName`, the reason is too many data columns: - |Table columns: `id`, `name`, `price`. - |Data columns: `1`, `a1`, `10`, `2021-06-20`. - |""".stripMargin - } else if (HoodieSparkUtils.gteqSpark3_4) { - """ - |too many data columns: - |Table columns: 'id', 'name', 'price'. - |Data columns: '1', 'a1', '10', '2021-06-20'. - |""".stripMargin - } else { - """ - |too many data columns: - |Table columns: 'id', 'name', 'price' - |Data columns: '1', 'a1', '10', '2021-06-20' - |""".stripMargin - } - checkExceptionContain(s"insert into $tableName partition(dt = '2021-06-20') select 1, 'a1', 10, '2021-06-20'")( - tooManyDataColumnsErrorMsg) - - val notEnoughDataColumnsErrorMsg = if (HoodieSparkUtils.gteqSpark3_5) { - s""" - |[INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS] Cannot write to `spark_catalog`.`default`.`$tableName`, the reason is not enough data columns: - |Table columns: `id`, `name`, `price`, `dt`. - |Data columns: `1`, `a1`, `10`. - |""".stripMargin - } else if (HoodieSparkUtils.gteqSpark3_4) { - """ - |not enough data columns: - |Table columns: 'id', 'name', 'price', 'dt'. - |Data columns: '1', 'a1', '10'. - |""".stripMargin - } else { - """ - |not enough data columns: - |Table columns: 'id', 'name', 'price', 'dt' - |Data columns: '1', 'a1', '10' - |""".stripMargin - } - checkExceptionContain(s"insert into $tableName select 1, 'a1', 10")(notEnoughDataColumnsErrorMsg) - withSQLConf("hoodie.sql.bulk.insert.enable" -> "true", "hoodie.sql.insert.mode" -> "strict") { - val tableName2 = generateTableName - spark.sql( - s""" - |create table $tableName2 ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | tblproperties ( - | primaryKey = 'id', - | preCombineField = 'ts' - | ) - """.stripMargin) - checkException(s"insert into $tableName2 values(1, 'a1', 10, 1000)")( - "Table with primaryKey can not use bulk insert in strict mode." - ) - } + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + checkException(s"insert into $tableName2 values(1, 'a1', 10, 1000)")( + "Table with primaryKey can not use bulk insert in strict mode." + ) } } @@ -823,8 +821,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase { test("Test bulk insert with insert into for single partitioned table") { withSQLConf("hoodie.sql.insert.mode" -> "non-strict") { - withRecordType()(withTempDir { tmp => - Seq("cow", "mor").foreach {tableType => + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => withTable(generateTableName) { tableName => spark.sql( s""" @@ -867,7 +865,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { ) } } - }) + } } } @@ -956,7 +954,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { test("Test bulk insert with CTAS") { withSQLConf("hoodie.sql.insert.mode" -> "non-strict", "hoodie.sql.bulk.insert.enable" -> "true") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow", "mor").foreach { tableType => withTable(generateTableName) { inputTable => spark.sql( @@ -998,13 +996,13 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } } - }) + } } } test("Test bulk insert with empty dataset") { withSQLConf(SPARK_SQL_INSERT_INTO_OPERATION.key -> WriteOperationType.BULK_INSERT.value()) { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow", "mor").foreach { tableType => withTable(generateTableName) { inputTable => spark.sql( @@ -1042,7 +1040,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } } - }) + } } } @@ -1054,7 +1052,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { Array() } withSQLConf(bulkInsertConf: _*) { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow", "mor").foreach { tableType => withTable(generateTableName) { inputTable => spark.sql( @@ -1098,14 +1096,14 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } } - }) + } } } } test("Test bulk insert with insert overwrite table") { withSQLConf(SPARK_SQL_INSERT_INTO_OPERATION.key -> WriteOperationType.BULK_INSERT.value()) { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow", "mor").foreach { tableType => withTable(generateTableName) { nonPartitionedTable => spark.sql( @@ -1132,13 +1130,13 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } } - }) + } } } test("Test bulk insert with insert overwrite partition") { withSQLConf(SPARK_SQL_INSERT_INTO_OPERATION.key -> WriteOperationType.BULK_INSERT.value()) { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow", "mor").foreach { tableType => withTable(generateTableName) { partitionedTable => spark.sql( @@ -1179,7 +1177,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } } - }) + } } } @@ -1355,7 +1353,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { test("Test Insert Into With Catalog Identifier for spark >= 3.2.0") { Seq("hudi", "parquet").foreach { format => - withRecordType()(withTempDir { tmp => + withTempDir { tmp => val tableName = s"spark_catalog.default.$generateTableName" // Create a partitioned table if (HoodieSparkUtils.gteqSpark3_2) { @@ -1392,7 +1390,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { Seq(2, "a2", 10.0, 1000, "2021-01-05") ) } - }) + } } } @@ -1663,7 +1661,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { test("Test Insert Overwrite Into Bucket Index Table") { withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") { Seq("mor", "cow").foreach { tableType => - withRecordType()(withTempDir { tmp => + withTempDir { tmp => val tableName = generateTableName // Create a partitioned table spark.sql( @@ -1708,14 +1706,14 @@ class TestInsertTable extends HoodieSparkSqlTestBase { checkAnswer(s"select id, name, price, ts, dt from $tableName order by dt")( Seq(13, "a2", 12.0, 1000, "2021-01-05") ) - }) + } } } } test("Test Insert Overwrite Into Consistent Bucket Index Table") { withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => val tableName = generateTableName // Create a partitioned table spark.sql( @@ -1768,13 +1766,13 @@ class TestInsertTable extends HoodieSparkSqlTestBase { checkAnswer(s"select id, name, price, ts, dt from $tableName order by dt")( Seq(13, "a3", 12.0, 1000, "2021-01-05") ) - }) + } } } test("Test Hudi should not record empty preCombineKey in hoodie.properties") { withSQLConf("hoodie.datasource.write.operation" -> "insert") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => val tableName = generateTableName spark.sql( s""" @@ -1802,7 +1800,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { Seq(2, "name2", 12.0), Seq(3, "name3", 13.0) ) - }) + } } } @@ -2004,17 +2002,17 @@ class TestInsertTable extends HoodieSparkSqlTestBase { spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode") spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy") spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation") - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow", "mor").foreach { tableType => withTable(generateTableName) { tableName => ingestAndValidateData(tableType, tableName, tmp, WriteOperationType.UPSERT) } } - }) + } } test("Test sql write operation with INSERT_INTO override both strict mode and sql write operation") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow", "mor").foreach { tableType => Seq(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT, WriteOperationType.UPSERT).foreach { operation => withTable(generateTableName) { tableName => @@ -2023,11 +2021,11 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } } - }) + } } test("Test sql write operation with INSERT_INTO override only sql write operation") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow", "mor").foreach { tableType => Seq(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT, WriteOperationType.UPSERT).foreach { operation => withTable(generateTableName) { tableName => @@ -2036,7 +2034,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } } - }) + } } test("Test sql write operation with INSERT_INTO override only strict mode") { @@ -2045,14 +2043,14 @@ class TestInsertTable extends HoodieSparkSqlTestBase { spark.sessionState.conf.unsetConf(DataSourceWriteOptions.INSERT_DUP_POLICY.key()) spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation") spark.sessionState.conf.unsetConf("hoodie.sql.bulk.insert.enable") - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow", "mor").foreach { tableType => withTable(generateTableName) { tableName => ingestAndValidateData(tableType, tableName, tmp, WriteOperationType.UPSERT, List("set hoodie.sql.insert.mode = upsert")) } } - }) + } } def ingestAndValidateData(tableType: String, tableName: String, tmp: File, @@ -2126,17 +2124,17 @@ class TestInsertTable extends HoodieSparkSqlTestBase { spark.sessionState.conf.unsetConf("hoodie.sql.insert.mode") spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy") spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation") - withRecordType()(withTempDir { tmp => - Seq("cow","mor").foreach { tableType => + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => withTable(generateTableName) { tableName => ingestAndValidateDataNoPrecombine(tableType, tableName, tmp, WriteOperationType.INSERT) } } - }) + } } test("Test inaccurate index type") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => val targetTable = generateTableName assertThrows[IllegalArgumentException] { @@ -2164,7 +2162,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { |""".stripMargin) } } - }) + } } test("Test vectorized read nested columns for LegacyHoodieParquetFileFormat") { @@ -2270,7 +2268,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } test("Test insert dup policy with INSERT_INTO explicit new configs INSERT operation ") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow", "mor").foreach { tableType => val operation = WriteOperationType.INSERT Seq(NONE_INSERT_DUP_POLICY, DROP_INSERT_DUP_POLICY).foreach { dupPolicy => @@ -2282,11 +2280,11 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } } - }) + } } test("Test insert dup policy with INSERT_INTO explicit new configs BULK_INSERT operation ") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow").foreach { tableType => val operation = WriteOperationType.BULK_INSERT val dupPolicy = NONE_INSERT_DUP_POLICY @@ -2297,7 +2295,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { dupPolicy) } } - }) + } } test("Test DROP insert dup policy with INSERT_INTO explicit new configs BULK INSERT operation") { @@ -2364,7 +2362,6 @@ class TestInsertTable extends HoodieSparkSqlTestBase { }) } - def ingestAndValidateDataDupPolicy(tableType: String, tableName: String, tmp: File, expectedOperationtype: WriteOperationType = WriteOperationType.INSERT, setOptions: List[String] = List.empty, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala index 90398f4689f..b56ca09ab96 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala @@ -130,7 +130,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo test("Test MergeInto with more than once update actions for spark >= 3.1.x") { if (HoodieSparkUtils.gteqSpark3_1) { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => val targetTable = generateTableName spark.sql( s""" @@ -179,7 +179,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo checkAnswer(s"select id, name, data, country, ts from $targetTable")( Seq(1, "lb", 5, "shu", 1646643196L) ) - }) + } } } @@ -469,7 +469,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo } test("Test MergeInto for MOR table ") { - withRecordType()(withTempDir {tmp => + withTempDir {tmp => spark.sql("set hoodie.payload.combined.schema.validate = true") val tableName = generateTableName // Create a mor partitioned table. @@ -597,11 +597,11 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo checkAnswer(s"select id,name,price,dt from $tableName order by id")( Seq(1, "a1", 12, "2021-03-21") ) - }) + } } test("Test MergeInto with insert only") { - withRecordType()(withTempDir {tmp => + withTempDir {tmp => spark.sql("set hoodie.payload.combined.schema.validate = true") // Create a partitioned mor table val tableName = generateTableName @@ -652,7 +652,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo Seq(1, "a1", 10, "2021-03-21"), Seq(2, "a2", 10, "2021-03-20") ) - }) + } } test("Test MergeInto For PreCombineField") { @@ -730,7 +730,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo } test("Test MergeInto with preCombine field expression") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => spark.sql("set hoodie.payload.combined.schema.validate = true") Seq("cow", "mor").foreach { tableType => val tableName1 = generateTableName @@ -808,11 +808,11 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo Seq(1, "a1", 24, "2021-03-21", 1002) ) } - }) + } } test("Test MergeInto with primaryKey expression") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => spark.sql("set hoodie.payload.combined.schema.validate = true") val tableName1 = generateTableName spark.sql( @@ -908,7 +908,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo checkAnswer(s"select id,name,price,v,dt from $tableName1 order by id")( Seq(1, "a1", 10, 1000, "2021-03-21") ) - }) + } } test("Test MergeInto with combination of delete update insert") { @@ -1082,7 +1082,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo } test("Test Different Type of PreCombineField") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => spark.sql("set hoodie.payload.combined.schema.validate = true") val typeAndValue = Seq( ("string", "'1000'"), @@ -1138,7 +1138,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo Seq(1, "a1", 20.0) ) } - }) + } } test("Test MergeInto For MOR With Compaction On") { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala index ef76cb72ca5..8ea7284e840 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -142,7 +142,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { } test("Test Merge Into CTAS Table") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => spark.sql("set hoodie.payload.combined.schema.validate = true") val tableName = generateTableName spark.sql( @@ -174,7 +174,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { checkAnswer(s"select id, name from $tableName")( Seq(1, "a1_1") ) - }) + } } test("Test Merge With Complex Data Type") { @@ -242,7 +242,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { } test("Test column name matching for insert * and update set *") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => spark.sql("set hoodie.payload.combined.schema.validate = true") val tableName = generateTableName // Create table @@ -326,11 +326,11 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { Seq(3, "a3", 102.0, 1000, "2021-05-05"), Seq(4, "a4", 100.0, 1000, "2021-05-06") ) - }) + } } test("Test MergeInto For Source Table With Column Aliases") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => spark.sql("set hoodie.payload.combined.schema.validate = true") val tableName = generateTableName // Create table @@ -370,7 +370,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { Seq(1, "a1", 10.0, 1000) ) } - }) + } } /* TODO [HUDI-6472] @@ -556,7 +556,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { */ test("Test only insert when source table contains history") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => spark.sql("set hoodie.payload.combined.schema.validate = true") val tableName = generateTableName // Create table @@ -598,7 +598,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { Seq(1, "a1", 1.0, 10, "2022-08-18"), Seq(2, "a2", 10.0, 100, "2022-08-18") ) - }) + } } test("Test only insert when source table contains history and target table has multiple keys") { @@ -649,7 +649,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { } test("Test Merge Into For Source Table With Different Column Order") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => val tableName = generateTableName // Create a mor partitioned table. spark.sql( @@ -683,7 +683,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { checkAnswer(s"select id,name,price,dt from $tableName")( Seq(1, "a1", 10, "2021-03-21") ) - }) + } } test("Test Merge into with String cast to Double") { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala index 419bb43de43..dae2dda4bfa 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala @@ -247,7 +247,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase wit test("Test pkless multiple source match") { for (withPrecombine <- Seq(true, false)) { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => spark.sql("set hoodie.payload.combined.schema.validate = true") val tableName = generateTableName @@ -292,13 +292,13 @@ class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase wit Seq(1, "a1", 30.0, 100) ) } - }) + } } } test("Test MergeInto Basic pkless") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => spark.sql("set hoodie.payload.combined.schema.validate = true") spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true") val tableName = generateTableName @@ -385,6 +385,6 @@ class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase wit """.stripMargin) val cnt = spark.sql(s"select * from $tableName where id = 1").count() assertResult(0)(cnt) - }) + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index 6a64c69021c..bfd14ae4c5a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -143,7 +143,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } test("Test alter column types 2") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow", "mor").foreach { tableType => val tableName = generateTableName val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" @@ -176,7 +176,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { ) } } - }) + } } test("Test Enable and Disable Schema on read") { @@ -232,7 +232,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } test("Test alter table properties and add rename drop column") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow", "mor").foreach { tableType => val tableName = generateTableName val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" @@ -336,7 +336,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } } spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key) - }) + } } test("Test Chinese table ") { @@ -393,7 +393,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { test("Test alter column by add rename and drop") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => Seq("cow", "mor").foreach { tableType => val tableName = generateTableName val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" @@ -453,7 +453,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { validateInternalSchema(tablePath, isDropColumn = false, currentMaxColumnId = maxColumnId) } } - }) + } } private def validateInternalSchema(basePath: String, isDropColumn: Boolean, currentMaxColumnId: Int): Unit = { @@ -543,7 +543,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } test("Test alter column with complex schema") { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => withSQLConf(s"$SPARK_SQL_INSERT_INTO_OPERATION" -> "upsert", "hoodie.schema.on.read.enable" -> "true", "spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") { @@ -628,7 +628,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { ) } } - }) + } } test("Test schema auto evolution complex") { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala index 73bad3be282..e6275d22e62 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala @@ -69,7 +69,7 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase { test("Test Insert Into Records with time travel To new Table") { if (HoodieSparkUtils.gteqSpark3_2) { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => // Create Non-Partitioned table val tableName1 = generateTableName spark.sql( @@ -138,7 +138,7 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase { Seq(1, "a1", 10.0, 1000, "2022-02-14"), Seq(2, "a2", 10.0, 1000, "2022-02-15") ) - }) + } } } @@ -238,18 +238,18 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase { test("Test Unsupported syntax can be parsed") { if (HoodieSparkUtils.gteqSpark3_2) { checkAnswer("select 1 distribute by 1")(Seq(1)) - withRecordType()(withTempDir { dir => + withTempDir { dir => val path = dir.toURI.getPath spark.sql(s"insert overwrite local directory '$path' using parquet select 1") // Requires enable hive support, so didn't test it // spark.sql(s"insert overwrite local directory '$path' stored as orc select 1") - }) + } } } test("Test Select Record with time travel and Repartition") { if (HoodieSparkUtils.gteqSpark3_2) { - withRecordType()(withTempDir { tmp => + withTempDir { tmp => val tableName = generateTableName spark.sql( s""" @@ -289,7 +289,7 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase { s"select id, name, price, ts from $tableName TIMESTAMP AS OF '$instant1' distribute by cast(rand() * 2 as int)")( Seq(1, "a1", 10.0, 1000) ) - }) + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala index 0c2c34ae6d9..7c7fc70d3f3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala @@ -233,8 +233,8 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { } test("Test ignoring case for Update Table") { - withRecordType()(withTempDir { tmp => - Seq("cow", "mor").foreach {tableType => + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => val tableName = generateTableName // create table spark.sql( @@ -270,7 +270,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { Seq(1, "a1", 40.0, 1000) ) } - }) + } } test("Test decimal type") { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 502baf34ff4..263389af698 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -644,12 +644,10 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", recordType); } - @ParameterizedTest - @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"}) - public void testUpsertsCOW_ContinuousModeDisabled(HoodieRecordType recordType) throws Exception { + @Test + public void testUpsertsCOW_ContinuousModeDisabled() throws Exception { String tableBasePath = basePath + "/non_continuous_cow"; HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); - addRecordMerger(recordType, cfg.configs); cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true")); cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.INMEMORY.name())); @@ -675,12 +673,10 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor", recordType); } - @ParameterizedTest - @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"}) - public void testUpsertsMOR_ContinuousModeDisabled(HoodieRecordType recordType) throws Exception { + @Test + public void testUpsertsMOR_ContinuousModeDisabled() throws Exception { String tableBasePath = basePath + "/non_continuous_mor"; HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); - addRecordMerger(recordType, cfg.configs); cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true")); cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.INMEMORY.name())); @@ -878,8 +874,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } @ParameterizedTest - @CsvSource(value = {"true, AVRO", "true, SPARK", "false, AVRO", "false, SPARK"}) - public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean, HoodieRecordType recordType) throws Exception { + @ValueSource(booleans = {true, false}) + public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws Exception { String tableBasePath = basePath + "/cleanerDeleteReplacedDataWithArchive" + asyncClean; int totalRecords = 3000; @@ -887,7 +883,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // Step 1 : Prepare and insert data without archival and cleaner. // Make sure that there are 6 commits including 2 replacecommits completed. HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); - addRecordMerger(recordType, cfg.configs); + addRecordMerger(HoodieRecordType.AVRO, cfg.configs); cfg.continuousMode = true; cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "true", "2", "", "")); @@ -956,7 +952,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { configs.add(String.format("%s=%s", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName())); } - addRecordMerger(recordType, configs); + addRecordMerger(HoodieRecordType.AVRO, configs); cfg.configs = configs; cfg.continuousMode = false; // timeline as of now. no cleaner and archival kicked in. @@ -1188,16 +1184,15 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } @Timeout(600) - @ParameterizedTest - @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"}) - public void testAsyncClusteringServiceWithCompaction(HoodieRecordType recordType) throws Exception { + @Test + public void testAsyncClusteringServiceWithCompaction() throws Exception { String tableBasePath = basePath + "/asyncClusteringCompaction"; // Keep it higher than batch-size to test continuous mode int totalRecords = 2000; // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); - addRecordMerger(recordType, cfg.configs); + addRecordMerger(HoodieRecordType.AVRO, cfg.configs); cfg.continuousMode = true; cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "", "", "true", "3")); @@ -1216,14 +1211,14 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } @ParameterizedTest - @CsvSource(value = {"true, AVRO", "true, SPARK", "false, AVRO", "false, SPARK"}) - public void testAsyncClusteringJobWithRetry(boolean retryLastFailedClusteringJob, HoodieRecordType recordType) throws Exception { + @ValueSource(booleans = {true, false}) + public void testAsyncClusteringJobWithRetry(boolean retryLastFailedClusteringJob) throws Exception { String tableBasePath = basePath + "/asyncClustering3"; // ingest data int totalRecords = 3000; HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); - addRecordMerger(recordType, cfg.configs); + addRecordMerger(HoodieRecordType.AVRO, cfg.configs); cfg.continuousMode = false; cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "false", "0", "false", "0")); @@ -1251,7 +1246,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // trigger a scheduleAndExecute clustering job // when retryFailedClustering true => will rollback and re-execute failed clustering plan with same instant timestamp. // when retryFailedClustering false => will make and execute a new clustering plan with new instant timestamp. - HoodieClusteringJob scheduleAndExecute = initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute", retryLastFailedClusteringJob, recordType); + HoodieClusteringJob scheduleAndExecute = initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute", retryLastFailedClusteringJob, HoodieRecordType.AVRO); scheduleAndExecute.cluster(0); String completeClusteringTimeStamp = meta.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get().getTimestamp(); @@ -1265,11 +1260,11 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } @ParameterizedTest - @CsvSource(value = {"execute, AVRO", "schedule, AVRO", "scheduleAndExecute, AVRO", "execute, SPARK", "schedule, SPARK", "scheduleAndExecute, SPARK"}) - public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMode, HoodieRecordType recordType) throws Exception { + @ValueSource(strings = {"execute", "schedule", "scheduleAndExecute"}) + public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMode) throws Exception { String tableBasePath = basePath + "/asyncClustering2"; - HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "false", recordType, WriteOperationType.BULK_INSERT); - HoodieClusteringJob scheduleClusteringJob = initialHoodieClusteringJob(tableBasePath, null, true, runningMode, recordType); + HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "false", HoodieRecordType.AVRO, WriteOperationType.BULK_INSERT); + HoodieClusteringJob scheduleClusteringJob = initialHoodieClusteringJob(tableBasePath, null, true, runningMode, HoodieRecordType.AVRO); deltaStreamerTestRunner(ds, (r) -> { Exception exception = null; @@ -1475,9 +1470,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { * step involves using a SQL template to transform a source TEST-DATA-SOURCE ============================> HUDI TABLE * 1 ===============> HUDI TABLE 2 (incr-pull with transform) (incr-pull) Hudi Table 1 is synced with Hive. */ - @ParameterizedTest - @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"}) - public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline(HoodieRecordType recordType) throws Exception { + @Test + public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() throws Exception { + HoodieRecordType recordType = HoodieRecordType.AVRO; String tableBasePath = basePath + "/" + recordType.toString() + "/test_table2"; String downstreamTableBasePath = basePath + "/" + recordType.toString() + "/test_downstream_table2"; @@ -1648,14 +1643,12 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertFalse(props.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key())); } - @ParameterizedTest - @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"}) - public void testFilterDupes(HoodieRecordType recordType) throws Exception { + @Test + public void testFilterDupes() throws Exception { String tableBasePath = basePath + "/test_dupes_table"; // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); - addRecordMerger(recordType, cfg.configs); new HoodieDeltaStreamer(cfg, jsc).sync(); assertRecordCount(1000, tableBasePath, sqlContext); TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); @@ -1676,7 +1669,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { HoodieTableMetaClient mClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); HoodieInstant lastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get(); HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT); - addRecordMerger(recordType, cfg2.configs); + addRecordMerger(HoodieRecordType.AVRO, cfg2.configs); cfg2.filterDupes = false; cfg2.sourceLimit = 2000; cfg2.operation = WriteOperationType.UPSERT; @@ -2838,9 +2831,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } @ParameterizedTest - @CsvSource(value = {"COPY_ON_WRITE, AVRO", "MERGE_ON_READ, AVRO", - "COPY_ON_WRITE, SPARK", "MERGE_ON_READ, SPARK"}) - public void testConfigurationHotUpdate(HoodieTableType tableType, HoodieRecordType recordType) throws Exception { + @EnumSource(HoodieTableType.class) + public void testConfigurationHotUpdate(HoodieTableType tableType) throws Exception { + HoodieRecordType recordType = HoodieRecordType.AVRO; String tableBasePath = basePath + String.format("/configurationHotUpdate_%s_%s", tableType.name(), recordType.name()); HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
