[ 
https://issues.apache.org/jira/browse/FLINK-34869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840280#comment-17840280
 ] 

Hongshun Wang commented on FLINK-34869:
---------------------------------------

It seems I haven't fixed it [~jmahonin] 

> [Bug][mysql] Remove all previous table and add new added table will throw 
> Exception.
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-34869
>                 URL: https://issues.apache.org/jira/browse/FLINK-34869
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>            Reporter: Flink CDC Issue Import
>            Assignee: Josh Mahonin
>            Priority: Major
>              Labels: github-import
>
> ### Search before asking
> - [X] I searched in the 
> [issues|https://github.com/ververica/flink-cdc-connectors/issues) and found 
> nothing similar.
> ### Flink version
> 1.18
> ### Flink CDC version
> 3.0.1
> ### Database and its version
> anyone 
> ### Minimal reproduce step
> 1. Stop job in savepoint.
> 2. Set 'scan.incremental.snapshot.enabled' = 'true' and then set tableList 
> with tables which not includes in last time.
> 3. Then assign status will be chaos.
> Take a test case for example:
> ```java
> public class NewlyAddedTableITCase extends MySqlSourceTestBase {
>     @Test
>     public void testRemoveAndAddTablesOneByOne() throws Exception {
>         testRemoveAndAddTablesOneByOne(
>                 1, "address_hangzhou", "address_beijing", "address_shanghai");
>     }
>     private void testRemoveAndAddTablesOneByOne(int parallelism, String... 
> captureAddressTables)
>             throws Exception {
>         MySqlConnection connection = getConnection();
>         // step 1: create mysql tables with all tables included
>         initialAddressTables(connection, captureAddressTables);
>         final TemporaryFolder temporaryFolder = new TemporaryFolder();
>         temporaryFolder.create();
>         final String savepointDirectory = 
> temporaryFolder.newFolder().toURI().toString();
>         // get all expected data
>         List<String> fetchedDataList = new ArrayList<>();
>         String finishedSavePointPath = null;
>         // test removing and adding table one by one
>         for (int round = 0; round < captureAddressTables.length; round++] {
>             String captureTableThisRound = captureAddressTables[round];
>             String cityName = captureTableThisRound.split("_")[1];
>             StreamExecutionEnvironment env =
>                     getStreamExecutionEnvironment(finishedSavePointPath, 
> parallelism);
>             StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>             String createTableStatement =
>                     getCreateTableStatement(new HashMap<>(), 
> captureTableThisRound);
>             tEnv.executeSql(createTableStatement);
>             tEnv.executeSql(
>                     "CREATE TABLE sink ("
>                             + " table_name STRING,"
>                             + " id BIGINT,"
>                             + " country STRING,"
>                             + " city STRING,"
>                             + " detail_address STRING,"
>                             + " primary key (table_name,id) not enforced"
>                             + ") WITH ("
>                             + " 'connector' = 'values',"
>                             + " 'sink-insert-only' = 'false'"
>                             + ")");
>             TableResult tableResult = tEnv.executeSql("insert into sink 
> select * from address");
>             JobClient jobClient = tableResult.getJobClient().get();
>             // this round's snapshot data
>             fetchedDataList.addAll(
>                     Arrays.asList(
>                             format(
>                                     "+I[%s, 416874195632735147, China, %s, %s 
> West Town address 1]",
>                                     captureTableThisRound, cityName, 
> cityName),
>                             format(
>                                     "+I[%s, 416927583791428523, China, %s, %s 
> West Town address 2]",
>                                     captureTableThisRound, cityName, 
> cityName),
>                             format(
>                                     "+I[%s, 417022095255614379, China, %s, %s 
> West Town address 3]",
>                                     captureTableThisRound, cityName, 
> cityName)));
>             waitForSinkSize("sink", fetchedDataList.size());
>             assertEqualsInAnyOrder(fetchedDataList, 
> TestValuesTableFactory.getRawResults("sink"));
>             // only this round table's data is captured.
>             // step 3: make binlog data for all tables before this round(also 
> includes this round)
>             for (int i = 0; i <= round; i++) {
>                 String tableName = captureAddressTables[i];
>                 makeBinlogForAddressTable(connection, tableName, round);
>             }
>             // this round's binlog data
>             fetchedDataList.addAll(
>                     Arrays.asList(
>                             format(
>                                     "-U[%s, 416874195632735147, China, %s, %s 
> West Town address 1]",
>                                     captureTableThisRound, cityName, 
> cityName),
>                             format(
>                                     "+U[%s, 416874195632735147, CHINA_%s, %s, 
> %s West Town address 1]",
>                                     captureTableThisRound, round, cityName, 
> cityName),
>                             format(
>                                     "+I[%s, %d, China, %s, %s West Town 
> address 4]",
>                                     captureTableThisRound,
>                                     417022095255614380L + round,
>                                     cityName,
>                                     cityName)));
>             // step 4: assert fetched binlog data in this round
>             waitForSinkSize("sink", fetchedDataList.size());
>             assertEqualsInAnyOrder(fetchedDataList, 
> TestValuesTableFactory.getRawResults("sink"));
>             // step 5: trigger savepoint
>             finishedSavePointPath = triggerSavepointWithRetry(jobClient, 
> savepointDirectory);
>             jobClient.cancel().get();
>         }
>     }
>   // setting  primary key as id rather than <id, city> is more more realistic.
>    private String getCreateTableStatement(
>             Map<String, String> otherOptions, String... captureTableNames) {
>         return format(
>                 "CREATE TABLE address ("
>                         + " table_name STRING METADATA VIRTUAL,"
>                         + " id BIGINT NOT NULL,"
>                         + " country STRING,"
>                         + " city STRING,"
>                         + " detail_address STRING,"
>                         + " primary key (id) not enforced"
>                         + ") WITH ("
>                         + " 'connector' = 'mysql-cdc',"
>                         + " 'scan.incremental.snapshot.enabled' = 'true',"
>                         + " 'hostname' = '%s',"
>                         + " 'port' = '%s',"
>                         + " 'username' = '%s',"
>                         + " 'password' = '%s',"
>                         + " 'database-name' = '%s',"
>                         + " 'table-name' = '%s',"
>                         + " 'scan.incremental.snapshot.chunk.size' = '2',"
>                         + " 'server-time-zone' = 'UTC',"
>                         + " 'server-id' = '%s',"
>                         + " 'scan.newly-added-table.enabled' = 'true'"
>                         + " %s"
>                         + ")",
>                 MYSQL_CONTAINER.getHost(),
>                 MYSQL_CONTAINER.getDatabasePort(),
>                 customDatabase.getUsername(),
>                 customDatabase.getPassword(),
>                 customDatabase.getDatabaseName(),
>                 getTableNameRegex(captureTableNames),
>                 getServerId(),
>                 otherOptions.isEmpty()
>                         ? ""
>                         : ","
>                         + otherOptions.entrySet().stream()
>                         .map(
>                                 e ->
>                                         String.format(
>                                                 "'%s'='%s'",
>                                                 e.getKey(), e.getValue()))
>                         .collect(Collectors.joining(",")));
>     }
> }
> ```
> ### What did you expect to see?
> return true
> ### What did you see instead?
> An exception will occurs: 
> ```java
> org.apache.flink.util.FlinkRuntimeException: The assigner is not ready to 
> offer finished split information, this should not be called
>       at 
> com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.getFinishedSplitInfos(MySqlSnapshotSplitAssigner.java:379)
>  ~[classes/:?]
>       at 
> com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getFinishedSplitInfos(MySqlHybridSplitAssigner.java:139)
>  ~[classes/:?]
>       at 
> com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleLatestFinishedSplitNumberRequest(MySqlSourceEnumerator.java:324)
>  ~[classes/:?]
>       at 
> com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSourceEvent(MySqlSourceEnumerator.java:170)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleSourceEvent(SourceCoordinator.java:590)
>  ~[flink-runtime-1.18.0.jar:1.18.0]
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$3(SourceCoordinator.java:297)
>  ~[flink-runtime-1.18.0.jar:1.18.0]
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:469)
>  ~[flink-runtime-1.18.0.jar:1.18.0]
>       at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>  [flink-core-1.18.0.jar:1.18.0]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_362]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [?:1.8.0_362]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  [?:1.8.0_362]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  [?:1.8.0_362]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_362]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_362]
>       at java.lang.Thread.run(Thread.java:750) [?:1.8.0_362]
> ```
> ### Reason
> When restarted with newly added table, 
> MySqlBinlogSplit#filterOutdatedSplitInfos will filter previous table' 
> FinishedSnapshotSplitInfo. In this case, list of FinishedSnapshotSplitInfo 
> will be empty.
> Then when add binlog split back to split reader, 
> com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader#configureFilter
>  cBinlogSplitReader#shouldEmit will seen an empty list of 
> FinishedSnapshotSplitInfo as binlog-only job
> ```java
>     // specific offset mode
>         if (finishedSplitInfos.isEmpty()) {
>             for (TableId tableId : 
> currentBinlogSplit.getTableSchemas().keySet()) {
>                 tableIdBinlogPositionMap.put(tableId, 
> currentBinlogSplit.getStartingOffset());
>             }
>         }
>         // initial mode
>         else {
>             for (FinishedSnapshotSplitInfo finishedSplitInfo : 
> finishedSplitInfos) {
>                 TableId tableId = finishedSplitInfo.getTableId();
>                 List<FinishedSnapshotSplitInfo> list =
>                         splitsInfoMap.getOrDefault(tableId, new 
> ArrayList<>());
>                 list.add(finishedSplitInfo);
>                 splitsInfoMap.put(tableId, list);
>                 BinlogOffset highWatermark = 
> finishedSplitInfo.getHighWatermark();
>                 BinlogOffset maxHighWatermark = 
> tableIdBinlogPositionMap.get(tableId);
>                 if (maxHighWatermark == null || 
> highWatermark.isAfter(maxHighWatermark)) {
>                     tableIdBinlogPositionMap.put(tableId, highWatermark);
>                 }
>             }
>         }
> ```
> ### Are you willing to submit a PR?
> - [x] I'm willing to submit a PR!
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/apache/flink-cdc/issues/3051
> Created by: [loserwang1024|https://github.com/loserwang1024]
> Labels: bug, 
> Created at: Wed Jan 31 17:12:21 CST 2024
> State: open



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to