lw309637554 commented on a change in pull request #2196:
URL: https://github.com/apache/hudi/pull/2196#discussion_r518175216
##########
File path:
hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##########
@@ -156,6 +162,76 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be
pulled
}
+ @Test def testOverWriteModeUseReplaceAction(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val records2 = recordsToStrings(dataGen.generateInserts("002", 5)).toList
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ inputDF2.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val metaClient = new
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
+ val commits =
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
+ .map(instant => (instant.asInstanceOf[HoodieInstant]).getAction)
+ assertEquals(2, commits.size)
+ assertEquals("commit", commits(0))
+ assertEquals("replacecommit", commits(1))
+ }
+
+ @Test def testOverWriteModeUseReplaceActionOnDisJointPartitions(): Unit = {
+ // step1: Write 5 records to hoodie table for partition1
DEFAULT_FIRST_PARTITION_PATH
+ val records1 = recordsToStrings(dataGen.generateInsertsForPartition("001",
5, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ // step2: Write 7 more records using SaveMode.Overwrite for partition2
DEFAULT_SECOND_PARTITION_PATH
+ val records2 = recordsToStrings(dataGen.generateInsertsForPartition("002",
7, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).toList
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ inputDF2.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ inputDF2.registerTempTable("tmpTable")
+
+ // step3: Query the rows count from hoodie table for partition1
DEFAULT_FIRST_PARTITION_PATH
+ val recordCountForParititon1 = spark.sql(String.format("select count(*)
from tmpTable where partition = '%s'",
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect()
Review comment:
will fix it
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]