This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 313c051b6 [flink][kafka-cdc] Kafka cdc doesn't need to check table's 
primary keys (#1867)
313c051b6 is described below

commit 313c051b6ffe4588426d53667a31a3354a6ed535
Author: yuzelin <[email protected]>
AuthorDate: Wed Aug 23 13:38:13 2023 +0800

    [flink][kafka-cdc] Kafka cdc doesn't need to check table's primary keys 
(#1867)
---
 .../flink/sink/cdc/RichCdcMultiplexRecordEventParser.java   | 13 ++-----------
 1 file changed, 2 insertions(+), 11 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index ab52da8a1..5153664ad 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -71,7 +71,7 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
     public void setRawEvent(RichCdcMultiplexRecord record) {
         this.record = record;
         this.currentTable = record.tableName();
-        this.shouldSynchronizeCurrentTable = 
shouldSynchronizeCurrentTable(record.primaryKeys());
+        this.shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
         if (shouldSynchronizeCurrentTable) {
             this.currentParser = parsers.computeIfAbsent(currentTable, t -> 
new RichEventParser());
             this.currentParser.setRawEvent(record.toRichCdcRecord());
@@ -106,7 +106,7 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
         return Optional.empty();
     }
 
-    private boolean shouldSynchronizeCurrentTable(List<String> primaryKeys) {
+    private boolean shouldSynchronizeCurrentTable() {
         if (includedTables.contains(currentTable)) {
             return true;
         }
@@ -130,15 +130,6 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
             return false;
         }
 
-        if (primaryKeys.isEmpty()) {
-            LOG.debug(
-                    "Didn't find primary keys from kafka topic's table schemas 
for table '{}'. "
-                            + "This table won't be synchronized.",
-                    currentTable);
-            excludedTables.add(currentTable);
-            return false;
-        }
-
         includedTables.add(currentTable);
         return true;
     }

Reply via email to