This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c7718800925 [HUDI-6766] Fixing mysql debezium data loss (#9475)
c7718800925 is described below
commit c77188009252bfaea370a9bfefa68a8c02eca976
Author: Sandeep Parwal <[email protected]>
AuthorDate: Mon Sep 4 19:36:03 2023 -0700
[HUDI-6766] Fixing mysql debezium data loss (#9475)
---
.../model/debezium/MySqlDebeziumAvroPayload.java | 29 +++++++++++++++++++---
.../debezium/TestMySqlDebeziumAvroPayload.java | 6 +++++
2 files changed, 32 insertions(+), 3 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
index a0a6304fa40..fceafee554c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
@@ -66,8 +66,31 @@ public class MySqlDebeziumAvroPayload extends
AbstractDebeziumAvroPayload {
new HoodieDebeziumAvroPayloadException(String.format("%s cannot be
null in insert record: %s",
DebeziumConstants.ADDED_SEQ_COL_NAME, insertRecord)));
Option<String> currentSourceSeqOpt = extractSeq(currentRecord);
- // Pick the current value in storage only if its Seq (file+pos) is latest
- // compared to the Seq (file+pos) of the insert value
- return currentSourceSeqOpt.isPresent() &&
insertSourceSeq.compareTo(currentSourceSeqOpt.get()) < 0;
+
+ // handle bootstrap case
+ if (!currentSourceSeqOpt.isPresent()) {
+ return false;
+ }
+
+ // Seq is file+pos string like "001.000010", getting [001,000010] from it
+ String[] currentFilePos = currentSourceSeqOpt.get().split("\\.");
+ String[] insertFilePos = insertSourceSeq.split("\\.");
+
+ long currentFileNum = Long.valueOf(currentFilePos[0]);
+ long insertFileNum = Long.valueOf(insertFilePos[0]);
+
+ if (insertFileNum < currentFileNum) {
+ // pick the current value
+ return true;
+ } else if (insertFileNum > currentFileNum) {
+ // pick the insert value
+ return false;
+ }
+
+ // file name is the same, compare the position in the file
+ Long currentPos = Long.valueOf(currentFilePos[1]);
+ Long insertPos = Long.valueOf(insertFilePos[1]);
+
+ return insertPos <= currentPos;
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java
index f5c3563f064..e257e2bee02 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java
@@ -96,6 +96,12 @@ public class TestMySqlDebeziumAvroPayload {
payload = new MySqlDebeziumAvroPayload(lateRecord, "00000.222");
mergedRecord = payload.combineAndGetUpdateValue(existingRecord,
avroSchema);
validateRecord(mergedRecord, 1, Operation.INSERT, "00001.111");
+
+ GenericRecord originalRecord = createRecord(1, Operation.INSERT,
"00000.23");
+ payload = new MySqlDebeziumAvroPayload(originalRecord, "00000.23");
+ updateRecord = createRecord(1, Operation.UPDATE, "00000.123");
+ mergedRecord = payload.combineAndGetUpdateValue(updateRecord, avroSchema);
+ validateRecord(mergedRecord, 1, Operation.UPDATE, "00000.123");
}
@Test