loserwang1024 opened a new pull request, #3134: URL: https://github.com/apache/flink-cdc/pull/3134
### What's the problem Once, I removed a table from the option and then restarted the job from the savepoint, but the job couldn't read the binlog anymore. When I checked the logs, I found an Error level log stating: ' The enumerator received invalid request meta group id 6, the valid meta group id range is [0, 4].' It appears that the Reader is requesting more splits than the Enumerator is aware of. However, the code should indeed remove redundant split information from the Reader as seen in https://github.com/ververica/flink-cdc-connectors/pull/2292. So why does this issue occur? ### why occurs  Upon examining the code, I discovered the cause. If the job stops before completing all the split meta information and then restarts, this issue occurs. Suppose that the totalFinishedSplitSize of binlogSplit in the Reader is 6, and no meta information has been synchronized, leaving the finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 - (0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed table have two split). This could lead to an out-of-range request. ### How to reproduce - Add Thread.sleep(1000L) in com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#handleSourceEvents to postpone split meta infos synchronization. ```java public void handleSourceEvents(SourceEvent sourceEvent) { else if (sourceEvent instanceof BinlogSplitMetaEvent) { LOG.debug( "Source reader {} receives binlog meta with group id {}.", subtaskId, ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId()); try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } fillMetadataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent); } ``` - Add Thread.sleep(500L) in com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testRemoveTablesOneByOne to trigger savepoint before meta infos synchronization finishes. ```java // step 2: execute insert and trigger savepoint with all tables added { // ..ingore waitForSinkSize("sink", fetchedDataList.size()); Thread.sleep(500L); assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink")); finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); jobClient.cancel().get(); } // test removing table one by one, note that there should be at least one table remaining for (int round = 0; round < captureAddressTables.length - 1; round++) { ... } ``` - Add chunk-meta.group.sizeĀ =2 in com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#getCreateTableStatement Finally, run test(com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testJobManagerFailoverForRemoveTable), the error log will occur. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
