This is an automated email from the ASF dual-hosted git repository. czweng pushed a commit to branch revert-1743-fix_excluding_sync in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit eeb393135a1406d444c7803b695ce071a6701707 Author: tsreaper <[email protected]> AuthorDate: Wed Aug 9 10:22:29 2023 +0800 Revert "[flink] Sync the table with same name in different db (#1743)" This reverts commit 28bd193488bfee3a0b033eb38d6b72a939d6c00d. --- docs/content/how-to/cdc-ingestion.md | 6 +-- .../shortcodes/generated/mysql_sync_database.html | 2 +- .../cdc/mysql/MySqlDebeziumJsonEventParser.java | 5 +- .../action/cdc/mysql/MySqlSyncDatabaseAction.java | 13 +++-- .../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 63 +++------------------- .../mysql/MySqlSyncDatabaseTableListITCase.java | 4 +- 6 files changed, 24 insertions(+), 69 deletions(-) diff --git a/docs/content/how-to/cdc-ingestion.md b/docs/content/how-to/cdc-ingestion.md index f102753bf..4b13d0d08 100644 --- a/docs/content/how-to/cdc-ingestion.md +++ b/docs/content/how-to/cdc-ingestion.md @@ -196,7 +196,7 @@ under database `source_db`. The command to submit the job looks like: --table-conf bucket=4 \ --table-conf changelog-producer=input \ --table-conf sink.parallelism=4 \ - --including-tables 'source_db.product|source_db.user|source_db.address' + --including-tables 'product|user|address' ``` At a later point we would like the job to also synchronize tables [order, custom], @@ -222,7 +222,7 @@ The command to recover from previous snapshot and add new tables to synchronize --catalog-conf metastore=hive \ --catalog-conf uri=thrift://hive-metastore:9083 \ --table-conf bucket=4 \ - --including-tables 'source_db.product|source_db.user|source_db.address|source_db.order|source_db.custom' + --including-tables 'product|user|address|order|custom' ``` {{< hint info >}} @@ -249,7 +249,7 @@ synchronize all the `db.+.tbl.+` into tables `test_db.tbl1`, `test_db.tbl2` ... --table-conf bucket=4 \ --table-conf changelog-producer=input \ --table-conf sink.parallelism=4 \ - --including-tables 'db.+.tbl.+' + --including-tables 'tbl.+' ``` By setting database-name to a regular expression, the synchronization job will capture all tables under matched databases diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html b/docs/layouts/shortcodes/generated/mysql_sync_database.html index 6df0c04bc..fe6f3d5e3 100644 --- a/docs/layouts/shortcodes/generated/mysql_sync_database.html +++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html @@ -51,7 +51,7 @@ under the License. </tr> <tr> <td><h5>--including-tables</h5></td> - <td>It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'db1.a|db2.b|db2.c'.Regular expression is supported, for example, specifying "--including-tables db1.test|db2.paimon.*" means to synchronize table 'db1.test' and all tables start with 'db2.paimon'.</td> + <td>It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including-tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'.</td> </tr> <tr> <td><h5>--excluding-tables</h5></td> diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java index 344bc087c..418db64d2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java @@ -155,7 +155,7 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> { try { root = objectMapper.readValue(rawEvent, JsonNode.class); payload = root.get("payload"); - currentTable = getDatabaseName() + "." + payload.get("source").get("table").asText(); + currentTable = payload.get("source").get("table").asText(); shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable(); } catch (Exception e) { throw new RuntimeException(e); @@ -164,8 +164,7 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> { @Override public String parseTableName() { - String tableName = payload.get("source").get("table").asText(); - return tableNameConverter.convert(Identifier.create(getDatabaseName(), tableName)); + return tableNameConverter.convert(Identifier.create(getDatabaseName(), currentTable)); } private boolean isSchemaChange() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index 5a6819ee1..a6746904e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -295,8 +295,7 @@ public class MySqlSyncDatabaseAction extends ActionBase { schema.identifier()); return false; } - return shouldMonitorTable( - schema.identifier().getDatabaseName() + "." + schema.tableName()); + return shouldMonitorTable(schema.tableName()); }; } @@ -410,8 +409,14 @@ public class MySqlSyncDatabaseAction extends ActionBase { // a table can be monitored only when its name meets the including pattern and doesn't // be excluded by excluding pattern at the same time + String includingPattern = + String.format( + "%s%s(%s)", + mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME), + separatorRex, + includingTables); if (excludedTables.isEmpty()) { - return includingTables; + return includingPattern; } String excludingPattern = @@ -425,7 +430,7 @@ public class MySqlSyncDatabaseAction extends ActionBase { + t.getObjectName())) .collect(Collectors.joining("|")); excludingPattern = "?!" + excludingPattern; - return String.format("(%s)(%s)", excludingPattern, includingTables); + return String.format("(%s)(%s)", excludingPattern, includingPattern); } throw new UnsupportedOperationException("Unknown DatabaseSyncMode: " + mode); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java index 379f916dc..eeb76d8be 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java @@ -563,7 +563,7 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase { public void testIncludingTables() throws Exception { includingAndExcludingTablesImpl( "paimon_sync_database_including", - "paimon_sync_database_including.flink|paimon_sync_database_including.paimon.+", + "flink|paimon.+", null, Arrays.asList("flink", "paimon_1", "paimon_2"), Collections.singletonList("ignored")); @@ -575,67 +575,18 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase { includingAndExcludingTablesImpl( "paimon_sync_database_excluding", null, - "paimon_sync_database_excluding.flink|paimon_sync_database_excluding.paimon.+", + "flink|paimon.+", Collections.singletonList("sync"), Arrays.asList("flink", "paimon_1", "paimon_2")); } - @Test - @Timeout(60) - public void testSameTableNameInDifferentDatabase() throws Exception { - String databaseName = "paimon_sync_database_excluding|paimon_sync_database_including"; - String includingTables = "paimon_sync_database_including.paimon_1"; - String excludingTables = "paimon_sync_database_excluding.paimon_1"; - - Map<String, String> mySqlConfig = getBasicMySqlConfig(); - mySqlConfig.put("database-name", databaseName); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(2); - env.enableCheckpointing(1000); - env.setRestartStrategy(RestartStrategies.noRestart()); - - Map<String, String> tableConfig = getBasicTableConfig(); - MySqlSyncDatabaseAction action = - new MySqlSyncDatabaseAction( - mySqlConfig, - warehouse, - database, - false, - true, - null, - null, - includingTables, - excludingTables, - Collections.emptyMap(), - tableConfig, - DIVIDED); - action.build(env); - JobClient client = env.executeAsync(); - waitJobRunning(client); - - try (Statement statement = getStatement()) { - statement.executeUpdate( - "INSERT INTO paimon_sync_database_including.paimon_1 VALUES (1),(2)"); - statement.executeUpdate( - "INSERT INTO paimon_sync_database_excluding.paimon_1 VALUES (3),(4)"); - - FileStoreTable table = getFileStoreTable("paimon_1"); - RowType rowType = - RowType.of(new DataType[] {DataTypes.INT().notNull()}, new String[] {"k"}); - List<String> primaryKeys = Collections.singletonList("k"); - // only the paimon_sync_database_including.paimon_1 is synchronized - waitForResult(Arrays.asList("+I[1]", "+I[2]"), table, rowType, primaryKeys); - } - } - @Test @Timeout(60) public void testIncludingAndExcludingTables() throws Exception { includingAndExcludingTablesImpl( "paimon_sync_database_in_excluding", - "paimon_sync_database_in_excluding.flink|paimon_sync_database_in_excluding.paimon.+", - "paimon_sync_database_in_excluding.paimon_1", + "flink|paimon.+", + "paimon_1", Arrays.asList("flink", "paimon_2"), Arrays.asList("paimon_1", "test")); } @@ -805,8 +756,8 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase { true, null, null, - mySqlDatabase + ".t.+", - mySqlDatabase + "..*a$", + "t.+", + ".*a$", Collections.emptyMap(), tableConfig, COMBINED); @@ -1106,7 +1057,7 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase { true, null, null, - databaseName + ".t.+", + "t.+", null, catalogConfig, tableConfig, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java index 657b1ab56..faf9f2977 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java @@ -69,8 +69,8 @@ public class MySqlSyncDatabaseTableListITCase extends MySqlActionITCaseBase { false, null, null, - ".*shard_.*\\.t.+|.*shard_.*\\.s.+", - ".*shard_.*\\.ta|.*shard_.*\\.sa", + "t.+|s.+", + "ta|sa", Collections.emptyMap(), tableConfig, mode);
