This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 83cdca8bc5d6beabcd60b8f8717a3b0133920d67 Author: Sandeep Parwal <[email protected]> AuthorDate: Mon Sep 4 19:36:03 2023 -0700 [HUDI-6766] Fixing mysql debezium data loss (#9475) --- .../model/debezium/MySqlDebeziumAvroPayload.java | 29 +++++++++++++++++++--- .../debezium/TestMySqlDebeziumAvroPayload.java | 6 +++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java index a0a6304fa40..fceafee554c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java @@ -66,8 +66,31 @@ public class MySqlDebeziumAvroPayload extends AbstractDebeziumAvroPayload { new HoodieDebeziumAvroPayloadException(String.format("%s cannot be null in insert record: %s", DebeziumConstants.ADDED_SEQ_COL_NAME, insertRecord))); Option<String> currentSourceSeqOpt = extractSeq(currentRecord); - // Pick the current value in storage only if its Seq (file+pos) is latest - // compared to the Seq (file+pos) of the insert value - return currentSourceSeqOpt.isPresent() && insertSourceSeq.compareTo(currentSourceSeqOpt.get()) < 0; + + // handle bootstrap case + if (!currentSourceSeqOpt.isPresent()) { + return false; + } + + // Seq is file+pos string like "001.000010", getting [001,000010] from it + String[] currentFilePos = currentSourceSeqOpt.get().split("\\."); + String[] insertFilePos = insertSourceSeq.split("\\."); + + long currentFileNum = Long.valueOf(currentFilePos[0]); + long insertFileNum = Long.valueOf(insertFilePos[0]); + + if (insertFileNum < currentFileNum) { + // pick the current value + return true; + } else if (insertFileNum > currentFileNum) { + // pick the insert value + return false; + } + + // file name is the same, compare the position in the file + Long currentPos = Long.valueOf(currentFilePos[1]); + Long insertPos = Long.valueOf(insertFilePos[1]); + + return insertPos <= currentPos; } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java index f5c3563f064..e257e2bee02 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java @@ -96,6 +96,12 @@ public class TestMySqlDebeziumAvroPayload { payload = new MySqlDebeziumAvroPayload(lateRecord, "00000.222"); mergedRecord = payload.combineAndGetUpdateValue(existingRecord, avroSchema); validateRecord(mergedRecord, 1, Operation.INSERT, "00001.111"); + + GenericRecord originalRecord = createRecord(1, Operation.INSERT, "00000.23"); + payload = new MySqlDebeziumAvroPayload(originalRecord, "00000.23"); + updateRecord = createRecord(1, Operation.UPDATE, "00000.123"); + mergedRecord = payload.combineAndGetUpdateValue(updateRecord, avroSchema); + validateRecord(mergedRecord, 1, Operation.UPDATE, "00000.123"); } @Test
