This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 139f3bb667 [cdc] Optimize SyncDatabaseAction performance by removing 
listTables calls (#5956)
139f3bb667 is described below

commit 139f3bb66720d11f87f6335501a63682fd6f5bdd
Author: big face cat <[email protected]>
AuthorDate: Sun May 24 10:24:55 2026 +0800

    [cdc] Optimize SyncDatabaseAction performance by removing listTables calls 
(#5956)
---
 .../paimon/flink/action/cdc/SyncDatabaseActionBase.java | 13 ++++---------
 .../sink/cdc/CdcDynamicTableParsingProcessFunction.java | 17 +++++++++++------
 .../sink/cdc/RichCdcMultiplexRecordEventParser.java     | 11 +++--------
 3 files changed, 18 insertions(+), 23 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 36a9be033e..d7393485ad 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -38,10 +38,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
@@ -219,11 +217,9 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
                         tablePrefix,
                         tableSuffix,
                         tableMapping);
-        Set<String> createdTables;
-        try {
-            createdTables = new HashSet<>(catalog.listTables(database));
-        } catch (Catalog.DatabaseNotExistException e) {
-            throw new RuntimeException(e);
+        List<String> databases = catalog.listDatabases();
+        if (!databases.contains(database)) {
+            throw new RuntimeException(new 
Catalog.DatabaseNotExistException(database));
         }
         return () ->
                 new RichCdcMultiplexRecordEventParser(
@@ -232,8 +228,7 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
                         tblExcludingPattern,
                         dbIncludingPattern,
                         dbExcludingPattern,
-                        tableNameConverter,
-                        createdTables);
+                        tableNameConverter);
     }
 
     protected abstract boolean requirePrimaryKeys();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index d56963731f..f6c1b4733f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink.cdc;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
 
 import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.typeinfo.TypeHint;
@@ -103,12 +104,16 @@ public class CdcDynamicTableParsingProcessFunction<T> 
extends ProcessFunction<T,
                         schema -> {
                             Identifier identifier = new Identifier(database, 
tableName);
                             try {
-                                catalog.createTable(identifier, schema, true);
-                            } catch (Exception e) {
-                                LOG.error(
-                                        "Cannot create newly added Paimon 
table {}",
-                                        identifier.getFullName(),
-                                        e);
+                                Table ignore = catalog.getTable(identifier);
+                            } catch (Catalog.TableNotExistException e) {
+                                try {
+                                    catalog.createTable(identifier, schema, 
true);
+                                } catch (Exception ex) {
+                                    LOG.error(
+                                            "Cannot create newly added Paimon 
table {}",
+                                            identifier.getFullName(),
+                                            ex);
+                                }
                             }
                         });
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 4ae4da6706..f219aec903 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -50,7 +50,6 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
     @Nullable private final Pattern dbIncludingPattern;
     @Nullable private final Pattern dbExcludingPattern;
     private final TableNameConverter tableNameConverter;
-    private final Set<String> createdTables;
 
     private final Map<String, RichEventParser> parsers = new HashMap<>();
     private final Set<String> includedTables = new HashSet<>();
@@ -66,7 +65,7 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
     private RichEventParser currentParser;
 
     public RichCdcMultiplexRecordEventParser(boolean caseSensitive) {
-        this(null, null, null, null, null, new 
TableNameConverter(caseSensitive), new HashSet<>());
+        this(null, null, null, null, null, new 
TableNameConverter(caseSensitive));
     }
 
     public RichCdcMultiplexRecordEventParser(
@@ -75,15 +74,13 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
             @Nullable Pattern tblExcludingPattern,
             @Nullable Pattern dbIncludingPattern,
             @Nullable Pattern dbExcludingPattern,
-            TableNameConverter tableNameConverter,
-            Set<String> createdTables) {
+            TableNameConverter tableNameConverter) {
         this.schemaBuilder = schemaBuilder;
         this.tblIncludingPattern = tblIncludingPattern;
         this.tblExcludingPattern = tblExcludingPattern;
         this.dbIncludingPattern = dbIncludingPattern;
         this.dbExcludingPattern = dbExcludingPattern;
         this.tableNameConverter = tableNameConverter;
-        this.createdTables = createdTables;
     }
 
     @Override
@@ -201,8 +198,6 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
     }
 
     private boolean shouldCreateCurrentTable() {
-        return shouldSynchronizeCurrentTable
-                && !record.cdcSchema().fields().isEmpty()
-                && createdTables.add(parseTableName());
+        return shouldSynchronizeCurrentTable && 
!record.cdcSchema().fields().isEmpty();
     }
 }

Reply via email to