This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 313c051b6 [flink][kafka-cdc] Kafka cdc doesn't need to check table's
primary keys (#1867)
313c051b6 is described below
commit 313c051b6ffe4588426d53667a31a3354a6ed535
Author: yuzelin <[email protected]>
AuthorDate: Wed Aug 23 13:38:13 2023 +0800
[flink][kafka-cdc] Kafka cdc doesn't need to check table's primary keys
(#1867)
---
.../flink/sink/cdc/RichCdcMultiplexRecordEventParser.java | 13 ++-----------
1 file changed, 2 insertions(+), 11 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index ab52da8a1..5153664ad 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -71,7 +71,7 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
public void setRawEvent(RichCdcMultiplexRecord record) {
this.record = record;
this.currentTable = record.tableName();
- this.shouldSynchronizeCurrentTable =
shouldSynchronizeCurrentTable(record.primaryKeys());
+ this.shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
if (shouldSynchronizeCurrentTable) {
this.currentParser = parsers.computeIfAbsent(currentTable, t ->
new RichEventParser());
this.currentParser.setRawEvent(record.toRichCdcRecord());
@@ -106,7 +106,7 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
return Optional.empty();
}
- private boolean shouldSynchronizeCurrentTable(List<String> primaryKeys) {
+ private boolean shouldSynchronizeCurrentTable() {
if (includedTables.contains(currentTable)) {
return true;
}
@@ -130,15 +130,6 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
return false;
}
- if (primaryKeys.isEmpty()) {
- LOG.debug(
- "Didn't find primary keys from kafka topic's table schemas
for table '{}'. "
- + "This table won't be synchronized.",
- currentTable);
- excludedTables.add(currentTable);
- return false;
- }
-
includedTables.add(currentTable);
return true;
}