This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7667af6e31a [HUDI-7035] Fix CDC Incremental Read When First Write
Contains Delete And Upsert (#10071)
7667af6e31a is described below
commit 7667af6e31aaae851185b7393ae396c952235c3c
Author: watermelon12138 <[email protected]>
AuthorDate: Tue Nov 14 10:25:42 2023 +0800
[HUDI-7035] Fix CDC Incremental Read When First Write Contains Delete And
Upsert (#10071)
---
.../hudi/common/table/cdc/HoodieCDCExtractor.java | 3 +-
.../functional/cdc/TestCDCDataFrameSuite.scala | 94 ++++++++++++++++++++++
2 files changed, 95 insertions(+), 2 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index f597066d7f7..eccffa36f25 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -266,8 +266,7 @@ public class HoodieCDCExtractor {
);
FileSlice beforeFileSlice = new FileSlice(fileGroupId,
writeStat.getPrevCommit(), beforeBaseFile, Collections.emptyList());
cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_DELETE,
new ArrayList<>(), Option.of(beforeFileSlice), Option.empty());
- } else if (writeStat.getNumUpdateWrites() == 0L &&
writeStat.getNumDeletes() == 0
- && writeStat.getNumWrites() == writeStat.getNumInserts()) {
+ } else if ((writeStat.getNumUpdateWrites() == 0L &&
writeStat.getNumWrites() == writeStat.getNumInserts())) {
// all the records in this file are new.
cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_INSERT,
path);
} else {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
index baf396f9232..210ea00048e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
@@ -20,12 +20,15 @@ package org.apache.hudi.functional.cdc
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions.{MOR_TABLE_TYPE_OPT_VAL,
PARTITIONPATH_FIELD_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
+import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
import
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.OP_KEY_ONLY
import
org.apache.hudi.common.table.cdc.HoodieCDCUtils.schemaBySupplementalLoggingMode
import org.apache.hudi.common.table.cdc.{HoodieCDCOperation,
HoodieCDCSupplementalLoggingMode}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings,
recordsToStrings}
+import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
@@ -753,4 +756,95 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
.save(basePath)
assertFalse(isFilesExistInFileSystem(cdcLogFiles2))
}
+
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieCDCSupplementalLoggingMode])
+ def testCDCWhenFirstWriteContainsUpsertAndDelete(loggingMode:
HoodieCDCSupplementalLoggingMode): Unit = {
+ val schema = StructType(List(
+ StructField("_id", StringType, nullable = true),
+ StructField("Op", StringType, nullable = true),
+ StructField("replicadmstimestamp", StringType, nullable = true),
+ StructField("code", StringType, nullable = true),
+ StructField("partition", StringType, nullable = true)
+ ))
+
+ val rdd1 = spark.sparkContext.parallelize(Seq(
+ Row("1", "I", "2023-06-14 15:46:06.953746", "A", "A"),
+ Row("1", "U", "2023-06-20 15:46:06.953746", "A", "A"),
+ Row("2", "I", "2023-06-14 15:46:06.953746", "A", "A"),
+ Row("2", "D", "2023-06-20 15:46:06.953746", "A", "A")
+ ))
+ val df1 = spark.createDataFrame(rdd1, schema)
+ df1.write.format("hudi")
+ .option(DataSourceWriteOptions.TABLE_TYPE.key(),
MOR_TABLE_TYPE_OPT_VAL)
+ .options(getQuickstartWriteConfigs)
+ .option(RECORDKEY_FIELD_OPT_KEY, "_id")
+ .option(PRECOMBINE_FIELD_OPT_KEY, "replicadmstimestamp")
+ .option(PARTITIONPATH_FIELD_OPT_KEY, "partition")
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName +
loggingMode.name())
+ .option("hoodie.datasource.write.operation", "upsert")
+ .option("hoodie.datasource.write.keygenerator.class",
"org.apache.hudi.keygen.ComplexKeyGenerator")
+ .option("hoodie.datasource.write.payload.class",
"org.apache.hudi.common.model.AWSDmsAvroPayload")
+ .option("hoodie.table.cdc.enabled", "true")
+ .option("hoodie.table.cdc.supplemental.logging.mode",
loggingMode.name())
+ .mode(SaveMode.Append).save(basePath)
+
+ val rdd2 = spark.sparkContext.parallelize(Seq(
+ Row("1", "U", "2023-06-14 15:46:06.953746", "A", "A"),
+ Row("2", "U", "2023-06-20 15:46:06.953746", "A", "A"),
+ Row("3", "I", "2023-06-20 15:46:06.953746", "A", "A")
+ ))
+ val df2 = spark.createDataFrame(rdd2, schema)
+ df2.write.format("hudi")
+ .option(DataSourceWriteOptions.TABLE_TYPE.key(),
MOR_TABLE_TYPE_OPT_VAL)
+ .options(getQuickstartWriteConfigs)
+ .option(RECORDKEY_FIELD_OPT_KEY, "_id")
+ .option(PRECOMBINE_FIELD_OPT_KEY, "replicadmstimestamp")
+ .option(PARTITIONPATH_FIELD_OPT_KEY, "partition")
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName +
loggingMode.name())
+ .option("hoodie.datasource.write.operation", "upsert")
+ .option("hoodie.datasource.write.keygenerator.class",
"org.apache.hudi.keygen.ComplexKeyGenerator")
+ .option("hoodie.datasource.write.payload.class",
"org.apache.hudi.common.model.AWSDmsAvroPayload")
+ .option("hoodie.table.cdc.enabled", "true")
+ .option("hoodie.table.cdc.supplemental.logging.mode",
loggingMode.name())
+ .mode(SaveMode.Append).save(basePath)
+
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(hadoopConf)
+ .build()
+ val startTimeStamp =
metaClient.reloadActiveTimeline().firstInstant().get.getTimestamp
+ val latestTimeStamp =
metaClient.reloadActiveTimeline().lastInstant().get.getTimestamp
+
+ val result1 = spark.read.format("hudi")
+ .option("hoodie.datasource.query.type", "incremental")
+ .option("hoodie.datasource.read.begin.instanttime", "0")
+ .option("hoodie.datasource.read.end.instanttime", startTimeStamp)
+ .option("hoodie.datasource.query.incremental.format", "cdc")
+ .load(basePath)
+ result1.show(false)
+ assertCDCOpCnt(result1, 1, 0, 0)
+ assertEquals(result1.count(), 1)
+
+ val result2 = spark.read.format("hudi")
+ .option("hoodie.datasource.query.type", "incremental")
+ .option("hoodie.datasource.read.begin.instanttime", startTimeStamp)
+ .option("hoodie.datasource.read.end.instanttime", latestTimeStamp)
+ .option("hoodie.datasource.query.incremental.format", "cdc")
+ .load(basePath)
+ result2.show(false)
+ assertCDCOpCnt(result2, 2, 1, 0)
+ assertEquals(result2.count(), 3)
+
+ val result3 = spark.read.format("hudi")
+ .option("hoodie.datasource.query.type", "incremental")
+ .option("hoodie.datasource.read.begin.instanttime", "0")
+ .option("hoodie.datasource.read.end.instanttime", latestTimeStamp)
+ .option("hoodie.datasource.query.incremental.format", "cdc")
+ .load(basePath)
+ result3.show(false)
+ assertCDCOpCnt(result3, 3, 1, 0)
+ assertEquals(result3.count(), 4)
+ }
}