This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 59f7d2806bf [HUDI-6562] Fixed issue for delete events for
AWSDmsAvroPayload when CDC enabled (#9519)
59f7d2806bf is described below
commit 59f7d2806bfc2d402dc8f5694dcb9d345e3d5a55
Author: Aditya Goenka <[email protected]>
AuthorDate: Fri Sep 1 04:47:48 2023 +0530
[HUDI-6562] Fixed issue for delete events for AWSDmsAvroPayload when CDC
enabled (#9519)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/io/HoodieMergeHandleWithChangeLog.java | 2 +-
.../functional/cdc/TestCDCDataFrameSuite.scala | 56 +++++++++++++++++++++-
2 files changed, 56 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index d610891c2ca..f8669416f0c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -103,7 +103,7 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O>
extends HoodieMergeHandl
// TODO Remove these unnecessary newInstance invocations
HoodieRecord<T> savedRecord = newRecord.newInstance();
super.writeInsertRecord(newRecord);
- if (!HoodieOperation.isDelete(newRecord.getOperation())) {
+ if (!HoodieOperation.isDelete(newRecord.getOperation()) &&
!savedRecord.isDelete(schema, config.getPayloadConfig().getProps())) {
cdcLogger.put(newRecord, null, savedRecord.toIndexedRecord(schema,
config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData));
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
index 36629687106..aac836d8c3a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
@@ -26,7 +26,8 @@ import org.apache.hudi.common.table.cdc.{HoodieCDCOperation,
HoodieCDCSupplement
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings,
recordsToStrings}
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
@@ -634,4 +635,57 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
val cdcDataOnly2 = cdcDataFrame((commitTime2.toLong - 1).toString)
assertCDCOpCnt(cdcDataOnly2, insertedCnt2, updatedCnt2, 0)
}
+
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieCDCSupplementalLoggingMode])
+ def testCDCWithAWSDMSPayload(loggingMode: HoodieCDCSupplementalLoggingMode):
Unit = {
+ val options = Map(
+ "hoodie.table.name" -> "test",
+ "hoodie.datasource.write.recordkey.field" -> "id",
+ "hoodie.datasource.write.precombine.field" -> "replicadmstimestamp",
+ "hoodie.datasource.write.keygenerator.class" ->
"org.apache.hudi.keygen.NonpartitionedKeyGenerator",
+ "hoodie.datasource.write.partitionpath.field" -> "",
+ "hoodie.datasource.write.payload.class" ->
"org.apache.hudi.common.model.AWSDmsAvroPayload",
+ "hoodie.table.cdc.enabled" -> "true",
+ "hoodie.table.cdc.supplemental.logging.mode" -> "data_before_after"
+ )
+
+ val data: Seq[(String, String, String, String)] = Seq(
+ ("1", "I", "2023-06-14 15:46:06.953746", "A"),
+ ("2", "I", "2023-06-14 15:46:07.953746", "B"),
+ ("3", "I", "2023-06-14 15:46:08.953746", "C")
+ )
+
+ val schema: StructType = StructType(Seq(
+ StructField("id", StringType),
+ StructField("Op", StringType),
+ StructField("replicadmstimestamp", StringType),
+ StructField("code", StringType)
+ ))
+
+ val df = spark.createDataFrame(data.map(Row.fromTuple), schema)
+ df.write
+ .format("org.apache.hudi")
+ .option("hoodie.datasource.write.operation", "upsert")
+ .options(options)
+ .mode("append")
+ .save(basePath)
+
+ assertEquals(spark.read.format("org.apache.hudi").load(basePath).count(),
3)
+
+ val newData: Seq[(String, String, String, String)] = Seq(
+ ("3", "D", "2023-06-14 15:47:09.953746", "B")
+ )
+
+ val newDf = spark.createDataFrame(newData.map(Row.fromTuple), schema)
+
+ newDf.write
+ .format("org.apache.hudi")
+ .option("hoodie.datasource.write.operation", "upsert")
+ .options(options)
+ .mode("append")
+ .save(basePath)
+
+ assertEquals(spark.read.format("org.apache.hudi").load(basePath).count(),
2)
+ }
}