This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c892ed0131d [HUDI-6387] Fix Spark CDC when delete all records from a
filegroup (#8990)
c892ed0131d is described below
commit c892ed0131dd044cc845e26e8d0f0f3b85e015f4
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Jun 16 02:13:17 2023 -0400
[HUDI-6387] Fix Spark CDC when delete all records from a filegroup (#8990)
Co-authored-by: Jonathan Vexler <=>
---
.../hudi/common/table/cdc/HoodieCDCExtractor.java | 2 +-
.../apache/spark/sql/hudi/TestCDCForSparkSQL.scala | 45 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index cb5b3c75f36..b536530577c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -265,7 +265,7 @@ public class HoodieCDCExtractor {
new HoodieIOException("Can not get the previous version of the
base file")
);
FileSlice beforeFileSlice = new FileSlice(fileGroupId,
writeStat.getPrevCommit(), beforeBaseFile, Collections.emptyList());
- cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_DELETE,
new ArrayList<>(), Option.empty(), Option.of(beforeFileSlice));
+ cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_DELETE,
new ArrayList<>(), Option.of(beforeFileSlice), Option.empty());
} else if (writeStat.getNumUpdateWrites() == 0L &&
writeStat.getNumDeletes() == 0
&& writeStat.getNumWrites() == writeStat.getNumInserts()) {
// all the records in this file are new.
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
index f82581244be..c8fc8b7dfea 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
@@ -44,6 +44,51 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
assertEquals(expectedDeletedCnt, cdcData.where("op = 'd'").count())
}
+ test("Test delete all records in filegroup") {
+ withTempDir { tmp =>
+ val databaseName = "hudi_database"
+ spark.sql(s"create database if not exists $databaseName")
+ spark.sql(s"use $databaseName")
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ | ) using hudi
+ | partitioned by (name)
+ | tblproperties (
+ | 'primaryKey' = 'id',
+ | 'preCombineField' = 'ts',
+ | 'hoodie.table.cdc.enabled' = 'true',
+ | 'hoodie.table.cdc.supplemental.logging.mode' =
'$DATA_BEFORE_AFTER',
+ | type = 'cow'
+ | )
+ | location '$basePath'
+ """.stripMargin)
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(spark.sessionState.newHadoopConf())
+ .build()
+ spark.sql(s"insert into $tableName values (1, 11, 1000, 'a1'), (2, 12,
1000, 'a2')")
+ assert(spark.sql(s"select _hoodie_file_name from
$tableName").distinct().count() == 2)
+ val fgForID1 = spark.sql(s"select _hoodie_file_name from $tableName
where id=1").head().get(0)
+ val commitTime1 =
metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
+ val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
+ cdcDataOnly1.show(false)
+ assertCDCOpCnt(cdcDataOnly1, 2, 0, 0)
+
+ spark.sql(s"delete from $tableName where id = 1")
+ val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong)
+ assertCDCOpCnt(cdcDataOnly2, 0, 0, 1)
+ assert(spark.sql(s"select _hoodie_file_name from
$tableName").distinct().count() == 1)
+ assert(!spark.sql(s"select _hoodie_file_name from
$tableName").head().get(0).equals(fgForID1))
+ }
+ }
+
/**
* Test CDC in cases that it's a COW/MOR non--partitioned table and
`cdcSupplementalLoggingMode` is true or not.
*/