This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f8d4eac64 [INLONG-7161][Sort] Fix bug that Mysql connector only output
the latest record in snapshot stage for table without primary key (#7164)
f8d4eac64 is described below
commit f8d4eac641f7fb7ab1a1124b9fd4c7c1344b41fe
Author: Schnapps <[email protected]>
AuthorDate: Thu Jan 5 19:19:02 2023 +0800
[INLONG-7161][Sort] Fix bug that Mysql connector only output the latest
record in snapshot stage for table without primary key (#7164)
Co-authored-by: stingpeng <[email protected]>
---
.../sort/cdc/mysql/source/utils/RecordUtils.java | 31 ++++++++++++++++------
1 file changed, 23 insertions(+), 8 deletions(-)
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
index ef7ef4ca9..6944bd795 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
@@ -88,7 +88,8 @@ public class RecordUtils {
List<SourceRecord> sourceRecords,
SchemaNameAdjuster nameAdjuster) {
List<SourceRecord> normalizedRecords = new ArrayList<>();
- Map<Struct, SourceRecord> snapshotRecords = new HashMap<>();
+ Map<Struct, SourceRecord> snapshotRecordsWithKey = new HashMap<>();
+ List<SourceRecord> snapshotRecordsWithoutKey = new ArrayList<>();
List<SourceRecord> binlogRecords = new ArrayList<>();
if (!sourceRecords.isEmpty()) {
@@ -103,7 +104,11 @@ public class RecordUtils {
for (; i < sourceRecords.size(); i++) {
SourceRecord sourceRecord = sourceRecords.get(i);
if (!isHighWatermarkEvent(sourceRecord)) {
- snapshotRecords.put((Struct) sourceRecord.key(),
sourceRecord);
+ if (sourceRecord.key() == null) {
+ snapshotRecordsWithoutKey.add(sourceRecord);
+ } else {
+ snapshotRecordsWithKey.put((Struct)
sourceRecord.key(), sourceRecord);
+ }
} else {
highWatermark = sourceRecord;
i++;
@@ -130,8 +135,11 @@ public class RecordUtils {
String.format(
"The last record should be high watermark signal
event, but is %s",
highWatermark));
+
normalizedRecords =
- upsertBinlog(lowWatermark, highWatermark, snapshotRecords,
binlogRecords);
+ upsertBinlog(lowWatermark, highWatermark,
snapshotRecordsWithKey,
+ binlogRecords, snapshotRecordsWithoutKey);
+
}
return normalizedRecords;
}
@@ -139,8 +147,9 @@ public class RecordUtils {
private static List<SourceRecord> upsertBinlog(
SourceRecord lowWatermarkEvent,
SourceRecord highWatermarkEvent,
- Map<Struct, SourceRecord> snapshotRecords,
- List<SourceRecord> binlogRecords) {
+ Map<Struct, SourceRecord> snapshotRecordsWithKey,
+ List<SourceRecord> binlogRecords,
+ List<SourceRecord> snapshotRecordsWithoutKey) {
// upsert binlog events to snapshot events of split
if (!binlogRecords.isEmpty()) {
for (SourceRecord binlog : binlogRecords) {
@@ -169,10 +178,10 @@ public class RecordUtils {
binlog.key(),
binlog.valueSchema(),
envelope.read(after, source,
fetchTs));
- snapshotRecords.put(key, record);
+ snapshotRecordsWithKey.put(key, record);
break;
case DELETE:
- snapshotRecords.remove(key);
+ snapshotRecordsWithKey.remove(key);
break;
case READ:
throw new IllegalStateException(
@@ -188,7 +197,13 @@ public class RecordUtils {
final List<SourceRecord> normalizedRecords = new ArrayList<>();
normalizedRecords.add(lowWatermarkEvent);
-
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values()));
+ if (!snapshotRecordsWithoutKey.isEmpty()) {
+ // for table without key, there is no need for binlog upsert
+ // because highWatermark equals to lowWatermark
+
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithoutKey));
+ } else {
+
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithKey.values()));
+ }
normalizedRecords.add(highWatermarkEvent);
return normalizedRecords;