This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 83cdca8bc5d6beabcd60b8f8717a3b0133920d67
Author: Sandeep Parwal <[email protected]>
AuthorDate: Mon Sep 4 19:36:03 2023 -0700

    [HUDI-6766] Fixing mysql debezium data loss  (#9475)
---
 .../model/debezium/MySqlDebeziumAvroPayload.java   | 29 +++++++++++++++++++---
 .../debezium/TestMySqlDebeziumAvroPayload.java     |  6 +++++
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
index a0a6304fa40..fceafee554c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java
@@ -66,8 +66,31 @@ public class MySqlDebeziumAvroPayload extends 
AbstractDebeziumAvroPayload {
             new HoodieDebeziumAvroPayloadException(String.format("%s cannot be 
null in insert record: %s",
                 DebeziumConstants.ADDED_SEQ_COL_NAME, insertRecord)));
     Option<String> currentSourceSeqOpt = extractSeq(currentRecord);
-    // Pick the current value in storage only if its Seq (file+pos) is latest
-    // compared to the Seq (file+pos) of the insert value
-    return currentSourceSeqOpt.isPresent() && 
insertSourceSeq.compareTo(currentSourceSeqOpt.get()) < 0;
+
+    // handle bootstrap case
+    if (!currentSourceSeqOpt.isPresent()) {
+      return false;
+    }
+
+    // Seq is file+pos string like "001.000010", getting [001,000010] from it
+    String[] currentFilePos = currentSourceSeqOpt.get().split("\\.");
+    String[] insertFilePos = insertSourceSeq.split("\\.");
+
+    long currentFileNum = Long.valueOf(currentFilePos[0]);
+    long insertFileNum = Long.valueOf(insertFilePos[0]);
+
+    if (insertFileNum < currentFileNum) {
+      // pick the current value
+      return true;
+    } else if (insertFileNum > currentFileNum) {
+      // pick the insert value
+      return false;
+    }
+
+    // file name is the same, compare the position in the file
+    Long currentPos = Long.valueOf(currentFilePos[1]);
+    Long insertPos = Long.valueOf(insertFilePos[1]);
+
+    return insertPos <= currentPos;
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java
index f5c3563f064..e257e2bee02 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/debezium/TestMySqlDebeziumAvroPayload.java
@@ -96,6 +96,12 @@ public class TestMySqlDebeziumAvroPayload {
     payload = new MySqlDebeziumAvroPayload(lateRecord, "00000.222");
     mergedRecord = payload.combineAndGetUpdateValue(existingRecord, 
avroSchema);
     validateRecord(mergedRecord, 1, Operation.INSERT, "00001.111");
+
+    GenericRecord originalRecord = createRecord(1, Operation.INSERT, 
"00000.23");
+    payload = new MySqlDebeziumAvroPayload(originalRecord, "00000.23");
+    updateRecord = createRecord(1, Operation.UPDATE, "00000.123");
+    mergedRecord = payload.combineAndGetUpdateValue(updateRecord, avroSchema);
+    validateRecord(mergedRecord, 1, Operation.UPDATE, "00000.123");
   }
 
   @Test

Reply via email to