[
https://issues.apache.org/jira/browse/FLINK-34869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Qingsheng Ren reassigned FLINK-34869:
-------------------------------------
Assignee: Josh Mahonin
> [Bug][mysql] Remove all previous table and add new added table will throw
> Exception.
> ------------------------------------------------------------------------------------
>
> Key: FLINK-34869
> URL: https://issues.apache.org/jira/browse/FLINK-34869
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Reporter: Flink CDC Issue Import
> Assignee: Josh Mahonin
> Priority: Major
> Labels: github-import
>
> ### Search before asking
> - [X] I searched in the
> [issues|https://github.com/ververica/flink-cdc-connectors/issues) and found
> nothing similar.
> ### Flink version
> 1.18
> ### Flink CDC version
> 3.0.1
> ### Database and its version
> anyone
> ### Minimal reproduce step
> 1. Stop job in savepoint.
> 2. Set 'scan.incremental.snapshot.enabled' = 'true' and then set tableList
> with tables which not includes in last time.
> 3. Then assign status will be chaos.
> Take a test case for example:
> ```java
> public class NewlyAddedTableITCase extends MySqlSourceTestBase {
> @Test
> public void testRemoveAndAddTablesOneByOne() throws Exception {
> testRemoveAndAddTablesOneByOne(
> 1, "address_hangzhou", "address_beijing", "address_shanghai");
> }
> private void testRemoveAndAddTablesOneByOne(int parallelism, String...
> captureAddressTables)
> throws Exception {
> MySqlConnection connection = getConnection();
> // step 1: create mysql tables with all tables included
> initialAddressTables(connection, captureAddressTables);
> final TemporaryFolder temporaryFolder = new TemporaryFolder();
> temporaryFolder.create();
> final String savepointDirectory =
> temporaryFolder.newFolder().toURI().toString();
> // get all expected data
> List<String> fetchedDataList = new ArrayList<>();
> String finishedSavePointPath = null;
> // test removing and adding table one by one
> for (int round = 0; round < captureAddressTables.length; round++] {
> String captureTableThisRound = captureAddressTables[round];
> String cityName = captureTableThisRound.split("_")[1];
> StreamExecutionEnvironment env =
> getStreamExecutionEnvironment(finishedSavePointPath,
> parallelism);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> String createTableStatement =
> getCreateTableStatement(new HashMap<>(),
> captureTableThisRound);
> tEnv.executeSql(createTableStatement);
> tEnv.executeSql(
> "CREATE TABLE sink ("
> + " table_name STRING,"
> + " id BIGINT,"
> + " country STRING,"
> + " city STRING,"
> + " detail_address STRING,"
> + " primary key (table_name,id) not enforced"
> + ") WITH ("
> + " 'connector' = 'values',"
> + " 'sink-insert-only' = 'false'"
> + ")");
> TableResult tableResult = tEnv.executeSql("insert into sink
> select * from address");
> JobClient jobClient = tableResult.getJobClient().get();
> // this round's snapshot data
> fetchedDataList.addAll(
> Arrays.asList(
> format(
> "+I[%s, 416874195632735147, China, %s, %s
> West Town address 1]",
> captureTableThisRound, cityName,
> cityName),
> format(
> "+I[%s, 416927583791428523, China, %s, %s
> West Town address 2]",
> captureTableThisRound, cityName,
> cityName),
> format(
> "+I[%s, 417022095255614379, China, %s, %s
> West Town address 3]",
> captureTableThisRound, cityName,
> cityName)));
> waitForSinkSize("sink", fetchedDataList.size());
> assertEqualsInAnyOrder(fetchedDataList,
> TestValuesTableFactory.getRawResults("sink"));
> // only this round table's data is captured.
> // step 3: make binlog data for all tables before this round(also
> includes this round)
> for (int i = 0; i <= round; i++) {
> String tableName = captureAddressTables[i];
> makeBinlogForAddressTable(connection, tableName, round);
> }
> // this round's binlog data
> fetchedDataList.addAll(
> Arrays.asList(
> format(
> "-U[%s, 416874195632735147, China, %s, %s
> West Town address 1]",
> captureTableThisRound, cityName,
> cityName),
> format(
> "+U[%s, 416874195632735147, CHINA_%s, %s,
> %s West Town address 1]",
> captureTableThisRound, round, cityName,
> cityName),
> format(
> "+I[%s, %d, China, %s, %s West Town
> address 4]",
> captureTableThisRound,
> 417022095255614380L + round,
> cityName,
> cityName)));
> // step 4: assert fetched binlog data in this round
> waitForSinkSize("sink", fetchedDataList.size());
> assertEqualsInAnyOrder(fetchedDataList,
> TestValuesTableFactory.getRawResults("sink"));
> // step 5: trigger savepoint
> finishedSavePointPath = triggerSavepointWithRetry(jobClient,
> savepointDirectory);
> jobClient.cancel().get();
> }
> }
> // setting primary key as id rather than <id, city> is more more realistic.
> private String getCreateTableStatement(
> Map<String, String> otherOptions, String... captureTableNames) {
> return format(
> "CREATE TABLE address ("
> + " table_name STRING METADATA VIRTUAL,"
> + " id BIGINT NOT NULL,"
> + " country STRING,"
> + " city STRING,"
> + " detail_address STRING,"
> + " primary key (id) not enforced"
> + ") WITH ("
> + " 'connector' = 'mysql-cdc',"
> + " 'scan.incremental.snapshot.enabled' = 'true',"
> + " 'hostname' = '%s',"
> + " 'port' = '%s',"
> + " 'username' = '%s',"
> + " 'password' = '%s',"
> + " 'database-name' = '%s',"
> + " 'table-name' = '%s',"
> + " 'scan.incremental.snapshot.chunk.size' = '2',"
> + " 'server-time-zone' = 'UTC',"
> + " 'server-id' = '%s',"
> + " 'scan.newly-added-table.enabled' = 'true'"
> + " %s"
> + ")",
> MYSQL_CONTAINER.getHost(),
> MYSQL_CONTAINER.getDatabasePort(),
> customDatabase.getUsername(),
> customDatabase.getPassword(),
> customDatabase.getDatabaseName(),
> getTableNameRegex(captureTableNames),
> getServerId(),
> otherOptions.isEmpty()
> ? ""
> : ","
> + otherOptions.entrySet().stream()
> .map(
> e ->
> String.format(
> "'%s'='%s'",
> e.getKey(), e.getValue()))
> .collect(Collectors.joining(",")));
> }
> }
> ```
> ### What did you expect to see?
> return true
> ### What did you see instead?
> An exception will occurs:
> ```java
> org.apache.flink.util.FlinkRuntimeException: The assigner is not ready to
> offer finished split information, this should not be called
> at
> com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.getFinishedSplitInfos(MySqlSnapshotSplitAssigner.java:379)
> ~[classes/:?]
> at
> com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getFinishedSplitInfos(MySqlHybridSplitAssigner.java:139)
> ~[classes/:?]
> at
> com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleLatestFinishedSplitNumberRequest(MySqlSourceEnumerator.java:324)
> ~[classes/:?]
> at
> com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSourceEvent(MySqlSourceEnumerator.java:170)
> ~[classes/:?]
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleSourceEvent(SourceCoordinator.java:590)
> ~[flink-runtime-1.18.0.jar:1.18.0]
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:297)
> ~[flink-runtime-1.18.0.jar:1.18.0]
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:469)
> ~[flink-runtime-1.18.0.jar:1.18.0]
> at
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
> [flink-core-1.18.0.jar:1.18.0]
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_362]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [?:1.8.0_362]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> [?:1.8.0_362]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [?:1.8.0_362]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_362]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_362]
> at java.lang.Thread.run(Thread.java:750) [?:1.8.0_362]
> ```
> ### Reason
> When restarted with newly added table,
> MySqlBinlogSplit#filterOutdatedSplitInfos will filter previous table'
> FinishedSnapshotSplitInfo. In this case, list of FinishedSnapshotSplitInfo
> will be empty.
> Then when add binlog split back to split reader,
> com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader#configureFilter
> cBinlogSplitReader#shouldEmit will seen an empty list of
> FinishedSnapshotSplitInfo as binlog-only job
> ```java
> // specific offset mode
> if (finishedSplitInfos.isEmpty()) {
> for (TableId tableId :
> currentBinlogSplit.getTableSchemas().keySet()) {
> tableIdBinlogPositionMap.put(tableId,
> currentBinlogSplit.getStartingOffset());
> }
> }
> // initial mode
> else {
> for (FinishedSnapshotSplitInfo finishedSplitInfo :
> finishedSplitInfos) {
> TableId tableId = finishedSplitInfo.getTableId();
> List<FinishedSnapshotSplitInfo> list =
> splitsInfoMap.getOrDefault(tableId, new
> ArrayList<>());
> list.add(finishedSplitInfo);
> splitsInfoMap.put(tableId, list);
> BinlogOffset highWatermark =
> finishedSplitInfo.getHighWatermark();
> BinlogOffset maxHighWatermark =
> tableIdBinlogPositionMap.get(tableId);
> if (maxHighWatermark == null ||
> highWatermark.isAfter(maxHighWatermark)) {
> tableIdBinlogPositionMap.put(tableId, highWatermark);
> }
> }
> }
> ```
> ### Are you willing to submit a PR?
> - [x] I'm willing to submit a PR!
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/apache/flink-cdc/issues/3051
> Created by: [loserwang1024|https://github.com/loserwang1024]
> Labels: bug,
> Created at: Wed Jan 31 17:12:21 CST 2024
> State: open
--
This message was sent by Atlassian Jira
(v8.20.10#820010)