danny0405 commented on code in PR #9475:
URL: https://github.com/apache/hudi/pull/9475#discussion_r1306799781
##########
hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java:
##########
@@ -66,8 +66,31 @@ protected boolean shouldPickCurrentRecord(IndexedRecord
currentRecord, IndexedRe
new HoodieDebeziumAvroPayloadException(String.format("%s cannot be
null in insert record: %s",
DebeziumConstants.ADDED_SEQ_COL_NAME, insertRecord)));
Option<String> currentSourceSeqOpt = extractSeq(currentRecord);
- // Pick the current value in storage only if its Seq (file+pos) is latest
- // compared to the Seq (file+pos) of the insert value
- return currentSourceSeqOpt.isPresent() &&
insertSourceSeq.compareTo(currentSourceSeqOpt.get()) < 0;
+
+ // handle bootstrap case
+ if (!currentSourceSeqOpt.isPresent()) {
+ return false;
+ }
+
+ // Seq is file+pos string like "001.000010", getting [001,000010] from it
+ String[] currentFilePos = currentSourceSeqOpt.get().split("\\.");
+ String[] insertFilePos = insertSourceSeq.split("\\.");
+
+ long currentFileNum = Long.valueOf(currentFilePos[0]);
+ long insertFileNum = Long.valueOf(insertFilePos[0]);
+
+ if (insertFileNum < currentFileNum) {
+ // pick the current value
+ return true;
+ } else if (insertFileNum > currentFileNum) {
+ // pick the insert value
+ return false;
+ }
+
+ // file name is the same, compare the position in the file
+ Long currentPos = Long.valueOf(currentFilePos[1]);
+ Long insertPos = Long.valueOf(insertFilePos[1]);
+
+ return insertPos.compareTo(currentPos) <= 0;
Review Comment:
`return insertPos <= currentPos` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]