lw309637554 commented on a change in pull request #2196:
URL: https://github.com/apache/hudi/pull/2196#discussion_r518175216



##########
File path: 
hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##########
@@ -156,6 +162,76 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be 
pulled
   }
 
+  @Test def testOverWriteModeUseReplaceAction(): Unit = {
+    val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    val records2 = recordsToStrings(dataGen.generateInserts("002", 5)).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val metaClient = new 
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
+    val commits =  
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
+      .map(instant => (instant.asInstanceOf[HoodieInstant]).getAction)
+    assertEquals(2, commits.size)
+    assertEquals("commit", commits(0))
+    assertEquals("replacecommit", commits(1))
+  }
+
+  @Test def testOverWriteModeUseReplaceActionOnDisJointPartitions(): Unit = {
+    // step1: Write 5 records to hoodie table for partition1 
DEFAULT_FIRST_PARTITION_PATH
+    val records1 = recordsToStrings(dataGen.generateInsertsForPartition("001", 
5, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    // step2: Write 7 more records using SaveMode.Overwrite for partition2 
DEFAULT_SECOND_PARTITION_PATH
+    val records2 = recordsToStrings(dataGen.generateInsertsForPartition("002", 
7, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    inputDF2.registerTempTable("tmpTable")
+
+    // step3: Query the rows count from hoodie table  for partition1 
DEFAULT_FIRST_PARTITION_PATH
+    val recordCountForParititon1 =  spark.sql(String.format("select count(*) 
from tmpTable where partition = '%s'", 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect()

Review comment:
       will fix it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to