[ 
https://issues.apache.org/jira/browse/FLINK-34634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17827334#comment-17827334
 ] 

Hongshun Wang commented on FLINK-34634:
---------------------------------------

Please assign it to me.

> Restarting the job will not read the changelog anymore if it stops before the 
> synchronization of meta information is complete and some table is removed
> -------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34634
>                 URL: https://issues.apache.org/jira/browse/FLINK-34634
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>            Reporter: Hongshun Wang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: cdc-3.1.0
>
>         Attachments: image-2024-03-09-15-25-26-187.png, 
> image-2024-03-09-15-27-46-073.png
>
>
> h3. What's the problem
> Once, I removed a table from the option and then restarted the job from the 
> savepoint, but the job couldn't read the binlog anymore. When I checked the 
> logs, I found an Error level log stating:
> ' The enumerator received invalid request meta group id 6, the valid meta 
> group id range is [0, 4].'
> It appears that the Reader is requesting more splits than the Enumerator is 
> aware of.
> However, the code should indeed remove redundant split information from the 
> Reader as seen in 
> [https://github.com/ververica/flink-cdc-connectors/pull/2292]. So why does 
> this issue occur?
>  
> h3. why occurs
> !image-2024-03-09-15-25-26-187.png|width=751,height=329!
> Upon examining the code, I discovered the cause. If the job stops before 
> completing all the split meta information and then restarts, this issue 
> occurs. Suppose that the totalFinishedSplitSize of binlogSplit in the Reader 
> is 6, and no meta information has been synchronized, leaving the 
> finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After 
> restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 
> - (0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed 
> table have two split). This could lead to an out-of-range request.
> !image-2024-03-09-15-27-46-073.png|width=755,height=305!
> h3. How to reproduce
>  * Add Thread.sleep(1000L) in 
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#handleSourceEvents
>  to postpone split meta infos synchronization.
> {code:java}
> public void handleSourceEvents(SourceEvent sourceEvent) {
> else if (sourceEvent instanceof BinlogSplitMetaEvent) {
>     LOG.debug(
>             "Source reader {} receives binlog meta with group id {}.",
>             subtaskId,
>             ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
>     try {
>         Thread.sleep(1000L);
>     } catch (InterruptedException e) {
>         throw new RuntimeException(e);
>     }
>     fillMetadataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
> } {code}
>  * Add Thread.sleep(500L) in 
> com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testRemoveTablesOneByOne
>  to trigger savepoint before meta infos synchronization finishes.
>  
> {code:java}
> // step 2: execute insert and trigger savepoint with all tables added
> {
>     // ..ingore 
>     waitForSinkSize("sink", fetchedDataList.size());
>     Thread.sleep(500L);
>     assertEqualsInAnyOrder(fetchedDataList, 
> TestValuesTableFactory.getRawResults("sink"));
>     finishedSavePointPath = triggerSavepointWithRetry(jobClient, 
> savepointDirectory);
>     jobClient.cancel().get();
> }
> // test removing table one by one, note that there should be at least one 
> table remaining
> for (int round = 0; round < captureAddressTables.length - 1; round++) {
> ...
> }
> {code}
>  
>  * Add chunk-meta.group.size  =2 in 
> com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#getCreateTableStatement
> Then, run 
> test(com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testJobManagerFailoverForRemoveTable),
>  the error log will occur.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to