This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c4e921059 [flink][mysql-cdc] MySQL CDC should ignore new tables that
don't have primary keys or should be excluded (#1567)
c4e921059 is described below
commit c4e921059506f03796a9b6fbc26ed2397bd98aa5
Author: yuzelin <[email protected]>
AuthorDate: Tue Jul 18 13:45:53 2023 +0800
[flink][mysql-cdc] MySQL CDC should ignore new tables that don't have
primary keys or should be excluded (#1567)
---
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 96 ++++++++++++++++++++--
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 4 +
.../action/cdc/mysql/MySqlTableSchemaBuilder.java | 38 ++-------
.../cdc/RichCdcMultiplexRecordEventParser.java | 10 +--
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 87 +++++++++++++++++++-
.../src/test/resources/mysql/setup.sql | 17 +++-
6 files changed, 201 insertions(+), 51 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index e66ab96ae..f5a3d8b53 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -40,10 +40,16 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeRefe
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import com.alibaba.druid.sql.SQLUtils;
+import com.alibaba.druid.sql.ast.SQLStatement;
+import
com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
+import com.alibaba.druid.util.JdbcConstants;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
@@ -53,9 +59,12 @@ import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -67,14 +76,20 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
private final ObjectMapper objectMapper = new ObjectMapper();
private final ZoneId serverTimeZone;
private final boolean caseSensitive;
-
private final TableNameConverter tableNameConverter;
private final List<ComputedColumn> computedColumns;
- private final NewTableSchemaBuilder<String> schemaBuilder;
+ private final NewTableSchemaBuilder<MySqlCreateTableStatement>
schemaBuilder;
+ @Nullable private final Pattern includingPattern;
+ @Nullable private final Pattern excludingPattern;
+ private final Set<String> includedTables = new HashSet<>();
+ private final Set<String> excludedTables = new HashSet<>();
private final boolean convertTinyint1ToBool;
private JsonNode root;
private JsonNode payload;
+ // NOTE: current table name is not converted by tableNameConverter
+ private String currentTable;
+ private boolean shouldSynchronizeCurrentTable;
public MySqlDebeziumJsonEventParser(
ZoneId serverTimeZone,
@@ -87,6 +102,8 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
computedColumns,
new TableNameConverter(caseSensitive),
ddl -> Optional.empty(),
+ null,
+ null,
convertTinyint1ToBool);
}
@@ -94,7 +111,9 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
ZoneId serverTimeZone,
boolean caseSensitive,
TableNameConverter tableNameConverter,
- NewTableSchemaBuilder<String> schemaBuilder,
+ NewTableSchemaBuilder<MySqlCreateTableStatement> schemaBuilder,
+ @Nullable Pattern includingPattern,
+ @Nullable Pattern excludingPattern,
boolean convertTinyint1ToBool) {
this(
serverTimeZone,
@@ -102,6 +121,8 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
Collections.emptyList(),
tableNameConverter,
schemaBuilder,
+ includingPattern,
+ excludingPattern,
convertTinyint1ToBool);
}
@@ -110,13 +131,17 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
boolean caseSensitive,
List<ComputedColumn> computedColumns,
TableNameConverter tableNameConverter,
- NewTableSchemaBuilder<String> schemaBuilder,
+ NewTableSchemaBuilder<MySqlCreateTableStatement> schemaBuilder,
+ @Nullable Pattern includingPattern,
+ @Nullable Pattern excludingPattern,
boolean convertTinyint1ToBool) {
this.serverTimeZone = serverTimeZone;
this.caseSensitive = caseSensitive;
this.computedColumns = computedColumns;
this.tableNameConverter = tableNameConverter;
this.schemaBuilder = schemaBuilder;
+ this.includingPattern = includingPattern;
+ this.excludingPattern = excludingPattern;
this.convertTinyint1ToBool = convertTinyint1ToBool;
}
@@ -125,6 +150,8 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
try {
root = objectMapper.readValue(rawEvent, JsonNode.class);
payload = root.get("payload");
+ currentTable = payload.get("source").get("table").asText();
+ shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -132,8 +159,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
@Override
public String parseTableName() {
- String tableName = payload.get("source").get("table").asText();
- return tableNameConverter.convert(tableName);
+ return tableNameConverter.convert(currentTable);
}
private boolean isSchemaChange() {
@@ -142,7 +168,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
@Override
public List<DataField> parseSchemaChange() {
- if (!isSchemaChange()) {
+ if (!shouldSynchronizeCurrentTable || !isSchemaChange()) {
return Collections.emptyList();
}
@@ -194,6 +220,10 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
@Override
public Optional<Schema> parseNewTable() {
+ if (!shouldSynchronizeCurrentTable) {
+ return Optional.empty();
+ }
+
JsonNode historyRecord = payload.get("historyRecord");
if (historyRecord == null) {
return Optional.empty();
@@ -205,7 +235,26 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
if (Strings.isNullOrEmpty(ddl)) {
return Optional.empty();
}
- return schemaBuilder.build(ddl);
+
+ SQLStatement statement = SQLUtils.parseSingleStatement(ddl,
JdbcConstants.MYSQL);
+ if (!(statement instanceof MySqlCreateTableStatement)) {
+ return Optional.empty();
+ }
+
+ MySqlCreateTableStatement createTableStatement =
(MySqlCreateTableStatement) statement;
+ List<String> primaryKeys =
createTableStatement.getPrimaryKeyNames();
+ String tableName = createTableStatement.getTableName();
+ if (primaryKeys.isEmpty()) {
+ LOG.debug(
+ "Didn't find primary keys from MySQL DDL for table
'{}'. "
+ + "This table won't be synchronized.",
+ tableName);
+ excludedTables.add(tableName);
+ shouldSynchronizeCurrentTable = false;
+ return Optional.empty();
+ }
+
+ return schemaBuilder.build(createTableStatement);
} catch (Exception e) {
LOG.info("Failed to parse history record for schema changes", e);
return Optional.empty();
@@ -214,7 +263,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
@Override
public List<CdcRecord> parseRecords() {
- if (isSchemaChange()) {
+ if (!shouldSynchronizeCurrentTable || isSchemaChange()) {
return Collections.emptyList();
}
List<CdcRecord> records = new ArrayList<>();
@@ -393,4 +442,33 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
}
return keyCaseInsensitive;
}
+
+ private boolean shouldSynchronizeCurrentTable() {
+ if (excludedTables.contains(currentTable)) {
+ return false;
+ }
+
+ if (includedTables.contains(currentTable)) {
+ return true;
+ }
+
+ boolean shouldSynchronize = true;
+ if (includingPattern != null) {
+ shouldSynchronize =
includingPattern.matcher(currentTable).matches();
+ }
+ if (excludingPattern != null) {
+ shouldSynchronize =
+ shouldSynchronize &&
!excludingPattern.matcher(currentTable).matches();
+ }
+ if (!shouldSynchronize) {
+ LOG.debug(
+ "Source table {} won't be synchronized because it was
excluded. ",
+ currentTable);
+ excludedTables.add(currentTable);
+ return false;
+ }
+
+ includedTables.add(currentTable);
+ return true;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 3901d5c5e..c931fca31 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -238,6 +238,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() :
ZoneId.of(serverTimeZone);
MySqlTableSchemaBuilder schemaBuilder =
new MySqlTableSchemaBuilder(tableConfig, caseSensitive);
+ Pattern includingPattern = this.includingPattern;
+ Pattern excludingPattern = this.excludingPattern;
Boolean convertTinyint1ToBool =
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
EventParser.Factory<String> parserFactory =
() ->
@@ -246,6 +248,8 @@ public class MySqlSyncDatabaseAction extends ActionBase {
caseSensitive,
tableNameConverter,
schemaBuilder,
+ includingPattern,
+ excludingPattern,
convertTinyint1ToBool);
String database = this.database;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index ee89a5b3d..0d9561fef 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -22,20 +22,15 @@ import
org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;
-import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLDataType;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLName;
-import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLCharExpr;
import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
import com.alibaba.druid.sql.ast.statement.SQLTableElement;
import
com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
-import com.alibaba.druid.util.JdbcConstants;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.LinkedHashMap;
import java.util.List;
@@ -47,9 +42,7 @@ import static
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CO
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Schema builder for MySQL cdc. */
-public class MySqlTableSchemaBuilder implements NewTableSchemaBuilder<String> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(MySqlTableSchemaBuilder.class);
+public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<MySqlCreateTableStatement> {
private final Map<String, String> tableConfig;
private final boolean caseSensitive;
@@ -60,28 +53,8 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<String> {
}
@Override
- public Optional<Schema> build(String ddl) {
- SQLStatement statement = SQLUtils.parseSingleStatement(ddl,
JdbcConstants.MYSQL);
-
- if (!(statement instanceof MySqlCreateTableStatement)) {
- return Optional.empty();
- }
-
- MySqlCreateTableStatement createTableStatement =
(MySqlCreateTableStatement) statement;
- String tableName = createTableStatement.getTableName();
-
- List<String> primaryKeys = createTableStatement.getPrimaryKeyNames();
- if (primaryKeys.isEmpty()) {
- LOG.debug(
- "Didn't find primary keys from MySQL DDL for table '{}'. "
- + "This table won't be synchronized.",
- tableName);
- // TODO: for non-pk tables, we should not handle their records
because we haven't
- // created them here. Fix in 'MySqlDebeziumJsonEventParser'
- return Optional.empty();
- }
-
- List<SQLTableElement> columns =
createTableStatement.getTableElementList();
+ public Optional<Schema> build(MySqlCreateTableStatement statement) {
+ List<SQLTableElement> columns = statement.getTableElementList();
LinkedHashMap<String, Tuple2<DataType, String>> fields = new
LinkedHashMap<>();
for (SQLTableElement element : columns) {
@@ -113,6 +86,8 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<String> {
}
}
+ List<String> primaryKeys = statement.getPrimaryKeyNames();
+
if (!caseSensitive) {
LinkedHashMap<String, Tuple2<DataType, String>> tmp = new
LinkedHashMap<>();
for (Map.Entry<String, Tuple2<DataType, String>> entry :
fields.entrySet()) {
@@ -121,7 +96,7 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<String> {
!tmp.containsKey(fieldName.toLowerCase()),
"Duplicate key '%s' in table '%s' appears when
converting fields map keys to case-insensitive form.",
fieldName,
- tableName);
+ statement.getTableName());
tmp.put(fieldName.toLowerCase(), entry.getValue());
}
fields = tmp;
@@ -135,7 +110,6 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<String> {
for (Map.Entry<String, Tuple2<DataType, String>> entry :
fields.entrySet()) {
builder.column(entry.getKey(), entry.getValue().f0,
entry.getValue().f1);
}
-
Schema schema = builder.primaryKey(primaryKeys).build();
return Optional.of(schema);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 326dad0d4..ab52da8a1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -114,15 +114,15 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
return false;
}
- boolean shouldSynchroniz = true;
+ boolean shouldSynchronize = true;
if (includingPattern != null) {
- shouldSynchroniz =
includingPattern.matcher(currentTable).matches();
+ shouldSynchronize =
includingPattern.matcher(currentTable).matches();
}
if (excludingPattern != null) {
- shouldSynchroniz =
- shouldSynchroniz &&
!excludingPattern.matcher(currentTable).matches();
+ shouldSynchronize =
+ shouldSynchronize &&
!excludingPattern.matcher(currentTable).matches();
}
- if (!shouldSynchroniz) {
+ if (!shouldSynchronize) {
LOG.debug(
"Source table {} won't be synchronized because it was
excluded. ",
currentTable);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 635f2053a..1606cab2a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -768,6 +768,82 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
testNewlyAddedTable(1, true, true,
"paimon_sync_database_newly_added_tables_4");
}
+ @Test
+ public void testAddIgnoredTable() throws Exception {
+ String mySqlDatabase = "paimon_sync_database_add_ignored_table";
+
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", mySqlDatabase);
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ Map<String, String> tableConfig = new HashMap<>();
+ tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1));
+ tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(2) +
2));
+
+ MySqlSyncDatabaseAction action =
+ new MySqlSyncDatabaseAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ false,
+ null,
+ null,
+ "t.+",
+ ".*a$",
+ Collections.emptyMap(),
+ tableConfig,
+ COMBINED);
+ action.build(env);
+ JobClient client = env.executeAsync();
+ waitJobRunning(client);
+
+ try (Connection conn =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(mySqlDatabase),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ Statement statement = conn.createStatement()) {
+
+ FileStoreTable table1 = getFileStoreTable("t1");
+
+ statement.executeUpdate("USE " + mySqlDatabase);
+ statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
+ statement.executeUpdate("INSERT INTO a VALUES (1, 'one')");
+
+ // make sure the job steps into incremental phase
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(10)},
+ new String[] {"k", "v1"});
+ List<String> primaryKeys = Collections.singletonList("k");
+ waitForResult(Collections.singletonList("+I[1, one]"), table1,
rowType, primaryKeys);
+
+ // create new tables at runtime
+ // synchronized table: t2
+ statement.executeUpdate("CREATE TABLE t2 (k INT, v1 VARCHAR(10),
PRIMARY KEY (k))");
+ statement.executeUpdate("INSERT INTO t2 VALUES (1, 'Hi')");
+ // not synchronized tables: ta, t3
+ statement.executeUpdate("CREATE TABLE ta (k INT, v1 VARCHAR(10),
PRIMARY KEY (k))");
+ statement.executeUpdate("INSERT INTO ta VALUES (1, 'Apache')");
+ statement.executeUpdate("CREATE TABLE t3 (k INT, v1 VARCHAR(10))");
+ statement.executeUpdate("INSERT INTO t3 VALUES (1, 'Paimon')");
+
+ statement.executeUpdate("INSERT INTO t1 VALUES (2, 'two')");
+ waitForResult(Arrays.asList("+I[1, one]", "+I[2, two]"), table1,
rowType, primaryKeys);
+
+ // check tables
+ assertTableExists(Arrays.asList("t1", "t2"));
+ assertTableNotExists(Arrays.asList("a", "ta", "t3"));
+
+ FileStoreTable newTable = getFileStoreTable("t2");
+ waitForResult(Collections.singletonList("+I[1, Hi]"), newTable,
rowType, primaryKeys);
+ }
+ }
+
public void testNewlyAddedTable(
int numOfNewlyAddedTables,
boolean testSavepointRecovery,
@@ -1094,14 +1170,17 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
waitForResult(expected, table, rowType, Arrays.asList("pk"));
}
+ private Catalog catalog() {
+ return CatalogFactory.createCatalog(CatalogContext.create(new
Path(warehouse)));
+ }
+
private FileStoreTable getFileStoreTable(String tableName) throws
Exception {
- Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
Identifier identifier = Identifier.create(database, tableName);
- return (FileStoreTable) catalog.getTable(identifier);
+ return (FileStoreTable) catalog().getTable(identifier);
}
private void assertTableExists(List<String> tableNames) {
- Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ Catalog catalog = catalog();
for (String tableName : tableNames) {
Identifier identifier = Identifier.create(database, tableName);
assertThat(catalog.tableExists(identifier)).isTrue();
@@ -1109,7 +1188,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
}
private void assertTableNotExists(List<String> tableNames) {
- Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ Catalog catalog = catalog();
for (String tableName : tableNames) {
Identifier identifier = Identifier.create(database, tableName);
assertThat(catalog.tableExists(identifier)).isFalse();
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index 5d9459273..715a6b66a 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -568,6 +568,21 @@ CREATE TABLE t2 (
PRIMARY KEY (k1, k2)
);
+CREATE DATABASE paimon_sync_database_add_ignored_table;
+USE paimon_sync_database_add_ignored_table;
+
+CREATE TABLE t1 (
+ k INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE a (
+ k INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (k)
+);
+
CREATE DATABASE paimon_sync_table_tinyint;
USE paimon_sync_table_tinyint;
@@ -592,4 +607,4 @@ CREATE TABLE schema_evolution_5 (
v1 VARCHAR(10) comment 'v1',
v2 TINYINT(1) comment 'tinyint(1)',
PRIMARY KEY (_id)
-);
\ No newline at end of file
+);