This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit dabfef7410d2156d60359c9a8a0daff702168eb1 Author: Schnapps <[email protected]> AuthorDate: Fri Jan 6 16:21:28 2023 +0800 [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (Supplement) (#7175) Co-authored-by: stingpeng <[email protected]> --- .../inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java index 2a455de98..2e6a9f85d 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java @@ -188,7 +188,8 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli if (isDataChangeRecord(sourceRecord)) { TableId tableId = getTableId(sourceRecord); BinlogOffset position = getBinlogPosition(sourceRecord); - if (hasEnterPureBinlogPhase(tableId, position)) { + // source record has no primary need no comparing for binlog position + if (hasEnterPureBinlogPhase(tableId, position) || sourceRecord.key() == null) { return true; } // only the table who captured snapshot splits need to filter
