This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c892ed0131d [HUDI-6387] Fix Spark CDC when delete all records from a 
filegroup (#8990)
c892ed0131d is described below

commit c892ed0131dd044cc845e26e8d0f0f3b85e015f4
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Jun 16 02:13:17 2023 -0400

    [HUDI-6387] Fix Spark CDC when delete all records from a filegroup (#8990)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../hudi/common/table/cdc/HoodieCDCExtractor.java  |  2 +-
 .../apache/spark/sql/hudi/TestCDCForSparkSQL.scala | 45 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index cb5b3c75f36..b536530577c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -265,7 +265,7 @@ public class HoodieCDCExtractor {
               new HoodieIOException("Can not get the previous version of the 
base file")
           );
           FileSlice beforeFileSlice = new FileSlice(fileGroupId, 
writeStat.getPrevCommit(), beforeBaseFile, Collections.emptyList());
-          cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_DELETE, 
new ArrayList<>(), Option.empty(), Option.of(beforeFileSlice));
+          cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_DELETE, 
new ArrayList<>(), Option.of(beforeFileSlice), Option.empty());
         } else if (writeStat.getNumUpdateWrites() == 0L && 
writeStat.getNumDeletes() == 0
             && writeStat.getNumWrites() == writeStat.getNumInserts()) {
           // all the records in this file are new.
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
index f82581244be..c8fc8b7dfea 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
@@ -44,6 +44,51 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
     assertEquals(expectedDeletedCnt, cdcData.where("op = 'd'").count())
   }
 
+  test("Test delete all records in filegroup") {
+    withTempDir { tmp =>
+      val databaseName = "hudi_database"
+      spark.sql(s"create database if not exists $databaseName")
+      spark.sql(s"use $databaseName")
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+      spark.sql(
+        s"""
+           | create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           | ) using hudi
+           | partitioned by (name)
+           | tblproperties (
+           |   'primaryKey' = 'id',
+           |   'preCombineField' = 'ts',
+           |   'hoodie.table.cdc.enabled' = 'true',
+           |   'hoodie.table.cdc.supplemental.logging.mode' = 
'$DATA_BEFORE_AFTER',
+           |   type = 'cow'
+           | )
+           | location '$basePath'
+      """.stripMargin)
+      val metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(spark.sessionState.newHadoopConf())
+        .build()
+      spark.sql(s"insert into $tableName values (1, 11, 1000, 'a1'), (2, 12, 
1000, 'a2')")
+      assert(spark.sql(s"select _hoodie_file_name from 
$tableName").distinct().count() == 2)
+      val fgForID1 = spark.sql(s"select _hoodie_file_name from $tableName 
where id=1").head().get(0)
+      val commitTime1 = 
metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
+      val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
+      cdcDataOnly1.show(false)
+      assertCDCOpCnt(cdcDataOnly1, 2, 0, 0)
+
+      spark.sql(s"delete from $tableName where id = 1")
+      val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong)
+      assertCDCOpCnt(cdcDataOnly2, 0, 0, 1)
+      assert(spark.sql(s"select _hoodie_file_name from 
$tableName").distinct().count() == 1)
+      assert(!spark.sql(s"select _hoodie_file_name from 
$tableName").head().get(0).equals(fgForID1))
+    }
+  }
+
   /**
    * Test CDC in cases that it's a COW/MOR non--partitioned table and 
`cdcSupplementalLoggingMode` is true or not.
    */

Reply via email to