This is an automated email from the ASF dual-hosted git repository. wenningd pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 6943004 [HUDI-2359] Add basic "hoodie_is_deleted" unit tests to TestDataSource classes new 69cbcc9 Merge pull request #3541 from rahil-c/rahil-c/HUDI-2359 6943004 is described below commit 694300477f61a9169c15cac1ddf67368dbf5dd1b Author: Rahil Chertara <rcher...@amazon.com> AuthorDate: Sun Aug 22 21:55:11 2021 -0700 [HUDI-2359] Add basic "hoodie_is_deleted" unit tests to TestDataSource classes --- .../apache/hudi/functional/TestCOWDataSource.scala | 27 ++++++++++++++++ .../apache/hudi/functional/TestMORDataSource.scala | 37 ++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index cbd05eb..efc1430 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -782,6 +782,33 @@ class TestCOWDataSource extends HoodieClientTestBase { assertEquals(enableDropPartitionColumns, !resultContainPartitionColumn) } + @Test + def testHoodieIsDeletedCOW(): Unit = { + val numRecords = 100 + val numRecordsToDelete = 2 + val records0 = recordsToStrings(dataGen.generateInserts("000", numRecords)).toList + val df0 = spark.read.json(spark.sparkContext.parallelize(records0, 2)) + df0.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Overwrite) + .save(basePath) + + val snapshotDF0 = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*/*/*") + assertEquals(numRecords, snapshotDF0.count()) + + val df1 = snapshotDF0.limit(numRecordsToDelete) + val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*) + val df2 = dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType)) + df2.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + val snapshotDF2 = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*/*/*") + assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count()) + } + def copyOnWriteTableSelect(enableDropPartitionColumns: Boolean): Boolean = { val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 3)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 1bd1c93..f9409e0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -33,6 +33,7 @@ import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDat import org.apache.log4j.LogManager import org.apache.spark.sql._ import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.BooleanType import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest @@ -783,4 +784,40 @@ class TestMORDataSource extends HoodieClientTestBase { val fs = tempPath.getFileSystem(spark.sparkContext.hadoopConfiguration) assertEquals(true, fs.listStatus(tempPath).isEmpty) } + + @Test + def testHoodieIsDeletedMOR(): Unit = { + val numRecords = 100 + val numRecordsToDelete = 2 + val schema = HoodieTestDataGenerator.SHORT_TRIP_SCHEMA + val records0 = recordsToStrings(dataGen.generateInsertsAsPerSchema("000", numRecords, schema)).toList + val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2)) + inputDF0.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + val snapshotDF0 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(numRecords, snapshotDF0.count()) + + val df1 = snapshotDF0.limit(numRecordsToDelete) + val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*) + + val df2 = dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType)) + df2.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + + val snapshotDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/*/*/*/*") + assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count()) + } }