This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 139f3bb667 [cdc] Optimize SyncDatabaseAction performance by removing
listTables calls (#5956)
139f3bb667 is described below
commit 139f3bb66720d11f87f6335501a63682fd6f5bdd
Author: big face cat <[email protected]>
AuthorDate: Sun May 24 10:24:55 2026 +0800
[cdc] Optimize SyncDatabaseAction performance by removing listTables calls
(#5956)
---
.../paimon/flink/action/cdc/SyncDatabaseActionBase.java | 13 ++++---------
.../sink/cdc/CdcDynamicTableParsingProcessFunction.java | 17 +++++++++++------
.../sink/cdc/RichCdcMultiplexRecordEventParser.java | 11 +++--------
3 files changed, 18 insertions(+), 23 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 36a9be033e..d7393485ad 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -38,10 +38,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
@@ -219,11 +217,9 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
tablePrefix,
tableSuffix,
tableMapping);
- Set<String> createdTables;
- try {
- createdTables = new HashSet<>(catalog.listTables(database));
- } catch (Catalog.DatabaseNotExistException e) {
- throw new RuntimeException(e);
+ List<String> databases = catalog.listDatabases();
+ if (!databases.contains(database)) {
+ throw new RuntimeException(new
Catalog.DatabaseNotExistException(database));
}
return () ->
new RichCdcMultiplexRecordEventParser(
@@ -232,8 +228,7 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
tblExcludingPattern,
dbIncludingPattern,
dbExcludingPattern,
- tableNameConverter,
- createdTables);
+ tableNameConverter);
}
protected abstract boolean requirePrimaryKeys();
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index d56963731f..f6c1b4733f 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.TypeHint;
@@ -103,12 +104,16 @@ public class CdcDynamicTableParsingProcessFunction<T>
extends ProcessFunction<T,
schema -> {
Identifier identifier = new Identifier(database,
tableName);
try {
- catalog.createTable(identifier, schema, true);
- } catch (Exception e) {
- LOG.error(
- "Cannot create newly added Paimon
table {}",
- identifier.getFullName(),
- e);
+ Table ignore = catalog.getTable(identifier);
+ } catch (Catalog.TableNotExistException e) {
+ try {
+ catalog.createTable(identifier, schema,
true);
+ } catch (Exception ex) {
+ LOG.error(
+ "Cannot create newly added Paimon
table {}",
+ identifier.getFullName(),
+ ex);
+ }
}
});
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 4ae4da6706..f219aec903 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -50,7 +50,6 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
@Nullable private final Pattern dbIncludingPattern;
@Nullable private final Pattern dbExcludingPattern;
private final TableNameConverter tableNameConverter;
- private final Set<String> createdTables;
private final Map<String, RichEventParser> parsers = new HashMap<>();
private final Set<String> includedTables = new HashSet<>();
@@ -66,7 +65,7 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
private RichEventParser currentParser;
public RichCdcMultiplexRecordEventParser(boolean caseSensitive) {
- this(null, null, null, null, null, new
TableNameConverter(caseSensitive), new HashSet<>());
+ this(null, null, null, null, null, new
TableNameConverter(caseSensitive));
}
public RichCdcMultiplexRecordEventParser(
@@ -75,15 +74,13 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
@Nullable Pattern tblExcludingPattern,
@Nullable Pattern dbIncludingPattern,
@Nullable Pattern dbExcludingPattern,
- TableNameConverter tableNameConverter,
- Set<String> createdTables) {
+ TableNameConverter tableNameConverter) {
this.schemaBuilder = schemaBuilder;
this.tblIncludingPattern = tblIncludingPattern;
this.tblExcludingPattern = tblExcludingPattern;
this.dbIncludingPattern = dbIncludingPattern;
this.dbExcludingPattern = dbExcludingPattern;
this.tableNameConverter = tableNameConverter;
- this.createdTables = createdTables;
}
@Override
@@ -201,8 +198,6 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
}
private boolean shouldCreateCurrentTable() {
- return shouldSynchronizeCurrentTable
- && !record.cdcSchema().fields().isEmpty()
- && createdTables.add(parseTableName());
+ return shouldSynchronizeCurrentTable &&
!record.cdcSchema().fields().isEmpty();
}
}