This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c4e921059 [flink][mysql-cdc] MySQL CDC should ignore new tables that 
don't have primary keys or should be excluded (#1567)
c4e921059 is described below

commit c4e921059506f03796a9b6fbc26ed2397bd98aa5
Author: yuzelin <[email protected]>
AuthorDate: Tue Jul 18 13:45:53 2023 +0800

    [flink][mysql-cdc] MySQL CDC should ignore new tables that don't have 
primary keys or should be excluded (#1567)
---
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    | 96 ++++++++++++++++++++--
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |  4 +
 .../action/cdc/mysql/MySqlTableSchemaBuilder.java  | 38 ++-------
 .../cdc/RichCdcMultiplexRecordEventParser.java     | 10 +--
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   | 87 +++++++++++++++++++-
 .../src/test/resources/mysql/setup.sql             | 17 +++-
 6 files changed, 201 insertions(+), 51 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index e66ab96ae..f5a3d8b53 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -40,10 +40,16 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeRefe
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
+import com.alibaba.druid.sql.SQLUtils;
+import com.alibaba.druid.sql.ast.SQLStatement;
+import 
com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
+import com.alibaba.druid.util.JdbcConstants;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.math.BigDecimal;
 import java.time.Instant;
 import java.time.LocalDateTime;
@@ -53,9 +59,12 @@ import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -67,14 +76,20 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
     private final ObjectMapper objectMapper = new ObjectMapper();
     private final ZoneId serverTimeZone;
     private final boolean caseSensitive;
-
     private final TableNameConverter tableNameConverter;
     private final List<ComputedColumn> computedColumns;
-    private final NewTableSchemaBuilder<String> schemaBuilder;
+    private final NewTableSchemaBuilder<MySqlCreateTableStatement> 
schemaBuilder;
+    @Nullable private final Pattern includingPattern;
+    @Nullable private final Pattern excludingPattern;
+    private final Set<String> includedTables = new HashSet<>();
+    private final Set<String> excludedTables = new HashSet<>();
     private final boolean convertTinyint1ToBool;
 
     private JsonNode root;
     private JsonNode payload;
+    // NOTE: current table name is not converted by tableNameConverter
+    private String currentTable;
+    private boolean shouldSynchronizeCurrentTable;
 
     public MySqlDebeziumJsonEventParser(
             ZoneId serverTimeZone,
@@ -87,6 +102,8 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                 computedColumns,
                 new TableNameConverter(caseSensitive),
                 ddl -> Optional.empty(),
+                null,
+                null,
                 convertTinyint1ToBool);
     }
 
@@ -94,7 +111,9 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             ZoneId serverTimeZone,
             boolean caseSensitive,
             TableNameConverter tableNameConverter,
-            NewTableSchemaBuilder<String> schemaBuilder,
+            NewTableSchemaBuilder<MySqlCreateTableStatement> schemaBuilder,
+            @Nullable Pattern includingPattern,
+            @Nullable Pattern excludingPattern,
             boolean convertTinyint1ToBool) {
         this(
                 serverTimeZone,
@@ -102,6 +121,8 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                 Collections.emptyList(),
                 tableNameConverter,
                 schemaBuilder,
+                includingPattern,
+                excludingPattern,
                 convertTinyint1ToBool);
     }
 
@@ -110,13 +131,17 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             boolean caseSensitive,
             List<ComputedColumn> computedColumns,
             TableNameConverter tableNameConverter,
-            NewTableSchemaBuilder<String> schemaBuilder,
+            NewTableSchemaBuilder<MySqlCreateTableStatement> schemaBuilder,
+            @Nullable Pattern includingPattern,
+            @Nullable Pattern excludingPattern,
             boolean convertTinyint1ToBool) {
         this.serverTimeZone = serverTimeZone;
         this.caseSensitive = caseSensitive;
         this.computedColumns = computedColumns;
         this.tableNameConverter = tableNameConverter;
         this.schemaBuilder = schemaBuilder;
+        this.includingPattern = includingPattern;
+        this.excludingPattern = excludingPattern;
         this.convertTinyint1ToBool = convertTinyint1ToBool;
     }
 
@@ -125,6 +150,8 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
         try {
             root = objectMapper.readValue(rawEvent, JsonNode.class);
             payload = root.get("payload");
+            currentTable = payload.get("source").get("table").asText();
+            shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -132,8 +159,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
 
     @Override
     public String parseTableName() {
-        String tableName = payload.get("source").get("table").asText();
-        return tableNameConverter.convert(tableName);
+        return tableNameConverter.convert(currentTable);
     }
 
     private boolean isSchemaChange() {
@@ -142,7 +168,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
 
     @Override
     public List<DataField> parseSchemaChange() {
-        if (!isSchemaChange()) {
+        if (!shouldSynchronizeCurrentTable || !isSchemaChange()) {
             return Collections.emptyList();
         }
 
@@ -194,6 +220,10 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
 
     @Override
     public Optional<Schema> parseNewTable() {
+        if (!shouldSynchronizeCurrentTable) {
+            return Optional.empty();
+        }
+
         JsonNode historyRecord = payload.get("historyRecord");
         if (historyRecord == null) {
             return Optional.empty();
@@ -205,7 +235,26 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             if (Strings.isNullOrEmpty(ddl)) {
                 return Optional.empty();
             }
-            return schemaBuilder.build(ddl);
+
+            SQLStatement statement = SQLUtils.parseSingleStatement(ddl, 
JdbcConstants.MYSQL);
+            if (!(statement instanceof MySqlCreateTableStatement)) {
+                return Optional.empty();
+            }
+
+            MySqlCreateTableStatement createTableStatement = 
(MySqlCreateTableStatement) statement;
+            List<String> primaryKeys = 
createTableStatement.getPrimaryKeyNames();
+            String tableName = createTableStatement.getTableName();
+            if (primaryKeys.isEmpty()) {
+                LOG.debug(
+                        "Didn't find primary keys from MySQL DDL for table 
'{}'. "
+                                + "This table won't be synchronized.",
+                        tableName);
+                excludedTables.add(tableName);
+                shouldSynchronizeCurrentTable = false;
+                return Optional.empty();
+            }
+
+            return schemaBuilder.build(createTableStatement);
         } catch (Exception e) {
             LOG.info("Failed to parse history record for schema changes", e);
             return Optional.empty();
@@ -214,7 +263,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
 
     @Override
     public List<CdcRecord> parseRecords() {
-        if (isSchemaChange()) {
+        if (!shouldSynchronizeCurrentTable || isSchemaChange()) {
             return Collections.emptyList();
         }
         List<CdcRecord> records = new ArrayList<>();
@@ -393,4 +442,33 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
         }
         return keyCaseInsensitive;
     }
+
+    private boolean shouldSynchronizeCurrentTable() {
+        if (excludedTables.contains(currentTable)) {
+            return false;
+        }
+
+        if (includedTables.contains(currentTable)) {
+            return true;
+        }
+
+        boolean shouldSynchronize = true;
+        if (includingPattern != null) {
+            shouldSynchronize = 
includingPattern.matcher(currentTable).matches();
+        }
+        if (excludingPattern != null) {
+            shouldSynchronize =
+                    shouldSynchronize && 
!excludingPattern.matcher(currentTable).matches();
+        }
+        if (!shouldSynchronize) {
+            LOG.debug(
+                    "Source table {} won't be synchronized because it was 
excluded. ",
+                    currentTable);
+            excludedTables.add(currentTable);
+            return false;
+        }
+
+        includedTables.add(currentTable);
+        return true;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 3901d5c5e..c931fca31 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -238,6 +238,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
         ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : 
ZoneId.of(serverTimeZone);
         MySqlTableSchemaBuilder schemaBuilder =
                 new MySqlTableSchemaBuilder(tableConfig, caseSensitive);
+        Pattern includingPattern = this.includingPattern;
+        Pattern excludingPattern = this.excludingPattern;
         Boolean convertTinyint1ToBool = 
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
         EventParser.Factory<String> parserFactory =
                 () ->
@@ -246,6 +248,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                                 caseSensitive,
                                 tableNameConverter,
                                 schemaBuilder,
+                                includingPattern,
+                                excludingPattern,
                                 convertTinyint1ToBool);
 
         String database = this.database;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index ee89a5b3d..0d9561fef 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -22,20 +22,15 @@ import 
org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataType;
 
-import com.alibaba.druid.sql.SQLUtils;
 import com.alibaba.druid.sql.ast.SQLDataType;
 import com.alibaba.druid.sql.ast.SQLExpr;
 import com.alibaba.druid.sql.ast.SQLName;
-import com.alibaba.druid.sql.ast.SQLStatement;
 import com.alibaba.druid.sql.ast.expr.SQLCharExpr;
 import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
 import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
 import com.alibaba.druid.sql.ast.statement.SQLTableElement;
 import 
com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
-import com.alibaba.druid.util.JdbcConstants;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -47,9 +42,7 @@ import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CO
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Schema builder for MySQL cdc. */
-public class MySqlTableSchemaBuilder implements NewTableSchemaBuilder<String> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlTableSchemaBuilder.class);
+public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<MySqlCreateTableStatement> {
 
     private final Map<String, String> tableConfig;
     private final boolean caseSensitive;
@@ -60,28 +53,8 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<String> {
     }
 
     @Override
-    public Optional<Schema> build(String ddl) {
-        SQLStatement statement = SQLUtils.parseSingleStatement(ddl, 
JdbcConstants.MYSQL);
-
-        if (!(statement instanceof MySqlCreateTableStatement)) {
-            return Optional.empty();
-        }
-
-        MySqlCreateTableStatement createTableStatement = 
(MySqlCreateTableStatement) statement;
-        String tableName = createTableStatement.getTableName();
-
-        List<String> primaryKeys = createTableStatement.getPrimaryKeyNames();
-        if (primaryKeys.isEmpty()) {
-            LOG.debug(
-                    "Didn't find primary keys from MySQL DDL for table '{}'. "
-                            + "This table won't be synchronized.",
-                    tableName);
-            // TODO: for non-pk tables, we should not handle their records 
because we haven't
-            // created them here. Fix in 'MySqlDebeziumJsonEventParser'
-            return Optional.empty();
-        }
-
-        List<SQLTableElement> columns = 
createTableStatement.getTableElementList();
+    public Optional<Schema> build(MySqlCreateTableStatement statement) {
+        List<SQLTableElement> columns = statement.getTableElementList();
         LinkedHashMap<String, Tuple2<DataType, String>> fields = new 
LinkedHashMap<>();
 
         for (SQLTableElement element : columns) {
@@ -113,6 +86,8 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<String> {
             }
         }
 
+        List<String> primaryKeys = statement.getPrimaryKeyNames();
+
         if (!caseSensitive) {
             LinkedHashMap<String, Tuple2<DataType, String>> tmp = new 
LinkedHashMap<>();
             for (Map.Entry<String, Tuple2<DataType, String>> entry : 
fields.entrySet()) {
@@ -121,7 +96,7 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<String> {
                         !tmp.containsKey(fieldName.toLowerCase()),
                         "Duplicate key '%s' in table '%s' appears when 
converting fields map keys to case-insensitive form.",
                         fieldName,
-                        tableName);
+                        statement.getTableName());
                 tmp.put(fieldName.toLowerCase(), entry.getValue());
             }
             fields = tmp;
@@ -135,7 +110,6 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<String> {
         for (Map.Entry<String, Tuple2<DataType, String>> entry : 
fields.entrySet()) {
             builder.column(entry.getKey(), entry.getValue().f0, 
entry.getValue().f1);
         }
-
         Schema schema = builder.primaryKey(primaryKeys).build();
 
         return Optional.of(schema);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 326dad0d4..ab52da8a1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -114,15 +114,15 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
             return false;
         }
 
-        boolean shouldSynchroniz = true;
+        boolean shouldSynchronize = true;
         if (includingPattern != null) {
-            shouldSynchroniz = 
includingPattern.matcher(currentTable).matches();
+            shouldSynchronize = 
includingPattern.matcher(currentTable).matches();
         }
         if (excludingPattern != null) {
-            shouldSynchroniz =
-                    shouldSynchroniz && 
!excludingPattern.matcher(currentTable).matches();
+            shouldSynchronize =
+                    shouldSynchronize && 
!excludingPattern.matcher(currentTable).matches();
         }
-        if (!shouldSynchroniz) {
+        if (!shouldSynchronize) {
             LOG.debug(
                     "Source table {} won't be synchronized because it was 
excluded. ",
                     currentTable);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 635f2053a..1606cab2a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -768,6 +768,82 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         testNewlyAddedTable(1, true, true, 
"paimon_sync_database_newly_added_tables_4");
     }
 
+    @Test
+    public void testAddIgnoredTable() throws Exception {
+        String mySqlDatabase = "paimon_sync_database_add_ignored_table";
+
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", mySqlDatabase);
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1));
+        tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(2) + 
2));
+
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        false,
+                        null,
+                        null,
+                        "t.+",
+                        ".*a$",
+                        Collections.emptyMap(),
+                        tableConfig,
+                        COMBINED);
+        action.build(env);
+        JobClient client = env.executeAsync();
+        waitJobRunning(client);
+
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(mySqlDatabase),
+                                MYSQL_CONTAINER.getUsername(),
+                                MYSQL_CONTAINER.getPassword());
+                Statement statement = conn.createStatement()) {
+
+            FileStoreTable table1 = getFileStoreTable("t1");
+
+            statement.executeUpdate("USE " + mySqlDatabase);
+            statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
+            statement.executeUpdate("INSERT INTO a VALUES (1, 'one')");
+
+            // make sure the job steps into incremental phase
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
+                            new String[] {"k", "v1"});
+            List<String> primaryKeys = Collections.singletonList("k");
+            waitForResult(Collections.singletonList("+I[1, one]"), table1, 
rowType, primaryKeys);
+
+            // create new tables at runtime
+            // synchronized table: t2
+            statement.executeUpdate("CREATE TABLE t2 (k INT, v1 VARCHAR(10), 
PRIMARY KEY (k))");
+            statement.executeUpdate("INSERT INTO t2 VALUES (1, 'Hi')");
+            // not synchronized tables: ta, t3
+            statement.executeUpdate("CREATE TABLE ta (k INT, v1 VARCHAR(10), 
PRIMARY KEY (k))");
+            statement.executeUpdate("INSERT INTO ta VALUES (1, 'Apache')");
+            statement.executeUpdate("CREATE TABLE t3 (k INT, v1 VARCHAR(10))");
+            statement.executeUpdate("INSERT INTO t3 VALUES (1, 'Paimon')");
+
+            statement.executeUpdate("INSERT INTO t1 VALUES (2, 'two')");
+            waitForResult(Arrays.asList("+I[1, one]", "+I[2, two]"), table1, 
rowType, primaryKeys);
+
+            // check tables
+            assertTableExists(Arrays.asList("t1", "t2"));
+            assertTableNotExists(Arrays.asList("a", "ta", "t3"));
+
+            FileStoreTable newTable = getFileStoreTable("t2");
+            waitForResult(Collections.singletonList("+I[1, Hi]"), newTable, 
rowType, primaryKeys);
+        }
+    }
+
     public void testNewlyAddedTable(
             int numOfNewlyAddedTables,
             boolean testSavepointRecovery,
@@ -1094,14 +1170,17 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         waitForResult(expected, table, rowType, Arrays.asList("pk"));
     }
 
+    private Catalog catalog() {
+        return CatalogFactory.createCatalog(CatalogContext.create(new 
Path(warehouse)));
+    }
+
     private FileStoreTable getFileStoreTable(String tableName) throws 
Exception {
-        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
         Identifier identifier = Identifier.create(database, tableName);
-        return (FileStoreTable) catalog.getTable(identifier);
+        return (FileStoreTable) catalog().getTable(identifier);
     }
 
     private void assertTableExists(List<String> tableNames) {
-        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        Catalog catalog = catalog();
         for (String tableName : tableNames) {
             Identifier identifier = Identifier.create(database, tableName);
             assertThat(catalog.tableExists(identifier)).isTrue();
@@ -1109,7 +1188,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
     }
 
     private void assertTableNotExists(List<String> tableNames) {
-        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        Catalog catalog = catalog();
         for (String tableName : tableNames) {
             Identifier identifier = Identifier.create(database, tableName);
             assertThat(catalog.tableExists(identifier)).isFalse();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index 5d9459273..715a6b66a 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -568,6 +568,21 @@ CREATE TABLE t2 (
     PRIMARY KEY (k1, k2)
 );
 
+CREATE DATABASE paimon_sync_database_add_ignored_table;
+USE paimon_sync_database_add_ignored_table;
+
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE a (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
 CREATE DATABASE paimon_sync_table_tinyint;
 USE paimon_sync_table_tinyint;
 
@@ -592,4 +607,4 @@ CREATE TABLE schema_evolution_5 (
     v1 VARCHAR(10) comment  'v1',
     v2 TINYINT(1) comment 'tinyint(1)',
     PRIMARY KEY (_id)
-);
\ No newline at end of file
+);

Reply via email to