This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7667af6e31a [HUDI-7035] Fix CDC Incremental Read When First Write 
Contains Delete And Upsert (#10071)
7667af6e31a is described below

commit 7667af6e31aaae851185b7393ae396c952235c3c
Author: watermelon12138 <[email protected]>
AuthorDate: Tue Nov 14 10:25:42 2023 +0800

    [HUDI-7035] Fix CDC Incremental Read When First Write Contains Delete And 
Upsert (#10071)
---
 .../hudi/common/table/cdc/HoodieCDCExtractor.java  |  3 +-
 .../functional/cdc/TestCDCDataFrameSuite.scala     | 94 ++++++++++++++++++++++
 2 files changed, 95 insertions(+), 2 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index f597066d7f7..eccffa36f25 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -266,8 +266,7 @@ public class HoodieCDCExtractor {
           );
           FileSlice beforeFileSlice = new FileSlice(fileGroupId, 
writeStat.getPrevCommit(), beforeBaseFile, Collections.emptyList());
           cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_DELETE, 
new ArrayList<>(), Option.of(beforeFileSlice), Option.empty());
-        } else if (writeStat.getNumUpdateWrites() == 0L && 
writeStat.getNumDeletes() == 0
-            && writeStat.getNumWrites() == writeStat.getNumInserts()) {
+        } else if ((writeStat.getNumUpdateWrites() == 0L && 
writeStat.getNumWrites() == writeStat.getNumInserts())) {
           // all the records in this file are new.
           cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_INSERT, 
path);
         } else {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
index baf396f9232..210ea00048e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
@@ -20,12 +20,15 @@ package org.apache.hudi.functional.cdc
 
 import org.apache.avro.generic.GenericRecord
 import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions.{MOR_TABLE_TYPE_OPT_VAL, 
PARTITIONPATH_FIELD_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
+import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
 import 
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.OP_KEY_ONLY
 import 
org.apache.hudi.common.table.cdc.HoodieCDCUtils.schemaBySupplementalLoggingMode
 import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, 
HoodieCDCSupplementalLoggingMode}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import 
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, 
recordsToStrings}
+import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
@@ -753,4 +756,95 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
       .save(basePath)
     assertFalse(isFilesExistInFileSystem(cdcLogFiles2))
   }
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieCDCSupplementalLoggingMode])
+  def testCDCWhenFirstWriteContainsUpsertAndDelete(loggingMode: 
HoodieCDCSupplementalLoggingMode): Unit = {
+      val schema = StructType(List(
+        StructField("_id", StringType, nullable = true),
+        StructField("Op", StringType, nullable = true),
+        StructField("replicadmstimestamp", StringType, nullable = true),
+        StructField("code", StringType, nullable = true),
+        StructField("partition", StringType, nullable = true)
+      ))
+
+      val rdd1 = spark.sparkContext.parallelize(Seq(
+        Row("1", "I", "2023-06-14 15:46:06.953746", "A", "A"),
+        Row("1", "U", "2023-06-20 15:46:06.953746", "A", "A"),
+        Row("2", "I", "2023-06-14 15:46:06.953746", "A", "A"),
+        Row("2", "D", "2023-06-20 15:46:06.953746", "A", "A")
+      ))
+      val df1 = spark.createDataFrame(rdd1, schema)
+      df1.write.format("hudi")
+        .option(DataSourceWriteOptions.TABLE_TYPE.key(), 
MOR_TABLE_TYPE_OPT_VAL)
+        .options(getQuickstartWriteConfigs)
+        .option(RECORDKEY_FIELD_OPT_KEY, "_id")
+        .option(PRECOMBINE_FIELD_OPT_KEY, "replicadmstimestamp")
+        .option(PARTITIONPATH_FIELD_OPT_KEY, "partition")
+        .option(HoodieWriteConfig.TBL_NAME.key(), tableName + 
loggingMode.name())
+        .option("hoodie.datasource.write.operation", "upsert")
+        .option("hoodie.datasource.write.keygenerator.class", 
"org.apache.hudi.keygen.ComplexKeyGenerator")
+        .option("hoodie.datasource.write.payload.class", 
"org.apache.hudi.common.model.AWSDmsAvroPayload")
+        .option("hoodie.table.cdc.enabled", "true")
+        .option("hoodie.table.cdc.supplemental.logging.mode", 
loggingMode.name())
+        .mode(SaveMode.Append).save(basePath)
+
+      val rdd2 = spark.sparkContext.parallelize(Seq(
+        Row("1", "U", "2023-06-14 15:46:06.953746", "A", "A"),
+        Row("2", "U", "2023-06-20 15:46:06.953746", "A", "A"),
+        Row("3", "I", "2023-06-20 15:46:06.953746", "A", "A")
+      ))
+      val df2 = spark.createDataFrame(rdd2, schema)
+      df2.write.format("hudi")
+        .option(DataSourceWriteOptions.TABLE_TYPE.key(), 
MOR_TABLE_TYPE_OPT_VAL)
+        .options(getQuickstartWriteConfigs)
+        .option(RECORDKEY_FIELD_OPT_KEY, "_id")
+        .option(PRECOMBINE_FIELD_OPT_KEY, "replicadmstimestamp")
+        .option(PARTITIONPATH_FIELD_OPT_KEY, "partition")
+        .option(HoodieWriteConfig.TBL_NAME.key(), tableName + 
loggingMode.name())
+        .option("hoodie.datasource.write.operation", "upsert")
+        .option("hoodie.datasource.write.keygenerator.class", 
"org.apache.hudi.keygen.ComplexKeyGenerator")
+        .option("hoodie.datasource.write.payload.class", 
"org.apache.hudi.common.model.AWSDmsAvroPayload")
+        .option("hoodie.table.cdc.enabled", "true")
+        .option("hoodie.table.cdc.supplemental.logging.mode", 
loggingMode.name())
+        .mode(SaveMode.Append).save(basePath)
+
+      val hadoopConf = spark.sessionState.newHadoopConf()
+      val metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(hadoopConf)
+        .build()
+      val startTimeStamp = 
metaClient.reloadActiveTimeline().firstInstant().get.getTimestamp
+      val latestTimeStamp = 
metaClient.reloadActiveTimeline().lastInstant().get.getTimestamp
+
+      val result1 = spark.read.format("hudi")
+        .option("hoodie.datasource.query.type", "incremental")
+        .option("hoodie.datasource.read.begin.instanttime", "0")
+        .option("hoodie.datasource.read.end.instanttime", startTimeStamp)
+        .option("hoodie.datasource.query.incremental.format", "cdc")
+        .load(basePath)
+      result1.show(false)
+      assertCDCOpCnt(result1, 1, 0, 0)
+      assertEquals(result1.count(), 1)
+
+      val result2 = spark.read.format("hudi")
+        .option("hoodie.datasource.query.type", "incremental")
+        .option("hoodie.datasource.read.begin.instanttime", startTimeStamp)
+        .option("hoodie.datasource.read.end.instanttime", latestTimeStamp)
+        .option("hoodie.datasource.query.incremental.format", "cdc")
+        .load(basePath)
+      result2.show(false)
+      assertCDCOpCnt(result2, 2, 1, 0)
+      assertEquals(result2.count(), 3)
+
+      val result3 = spark.read.format("hudi")
+        .option("hoodie.datasource.query.type", "incremental")
+        .option("hoodie.datasource.read.begin.instanttime", "0")
+        .option("hoodie.datasource.read.end.instanttime", latestTimeStamp)
+        .option("hoodie.datasource.query.incremental.format", "cdc")
+        .load(basePath)
+      result3.show(false)
+      assertCDCOpCnt(result3, 3, 1, 0)
+      assertEquals(result3.count(), 4)
+    }
 }

Reply via email to