[
https://issues.apache.org/jira/browse/FLINK-34634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17827334#comment-17827334
]
Hongshun Wang commented on FLINK-34634:
---------------------------------------
Please assign it to me.
> Restarting the job will not read the changelog anymore if it stops before the
> synchronization of meta information is complete and some table is removed
> -------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-34634
> URL: https://issues.apache.org/jira/browse/FLINK-34634
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Reporter: Hongshun Wang
> Priority: Major
> Labels: pull-request-available
> Fix For: cdc-3.1.0
>
> Attachments: image-2024-03-09-15-25-26-187.png,
> image-2024-03-09-15-27-46-073.png
>
>
> h3. What's the problem
> Once, I removed a table from the option and then restarted the job from the
> savepoint, but the job couldn't read the binlog anymore. When I checked the
> logs, I found an Error level log stating:
> ' The enumerator received invalid request meta group id 6, the valid meta
> group id range is [0, 4].'
> It appears that the Reader is requesting more splits than the Enumerator is
> aware of.
> However, the code should indeed remove redundant split information from the
> Reader as seen in
> [https://github.com/ververica/flink-cdc-connectors/pull/2292]. So why does
> this issue occur?
>
> h3. why occurs
> !image-2024-03-09-15-25-26-187.png|width=751,height=329!
> Upon examining the code, I discovered the cause. If the job stops before
> completing all the split meta information and then restarts, this issue
> occurs. Suppose that the totalFinishedSplitSize of binlogSplit in the Reader
> is 6, and no meta information has been synchronized, leaving the
> finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After
> restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6
> - (0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed
> table have two split). This could lead to an out-of-range request.
> !image-2024-03-09-15-27-46-073.png|width=755,height=305!
> h3. How to reproduce
> * Add Thread.sleep(1000L) in
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#handleSourceEvents
> to postpone split meta infos synchronization.
> {code:java}
> public void handleSourceEvents(SourceEvent sourceEvent) {
> else if (sourceEvent instanceof BinlogSplitMetaEvent) {
> LOG.debug(
> "Source reader {} receives binlog meta with group id {}.",
> subtaskId,
> ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
> try {
> Thread.sleep(1000L);
> } catch (InterruptedException e) {
> throw new RuntimeException(e);
> }
> fillMetadataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
> } {code}
> * Add Thread.sleep(500L) in
> com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testRemoveTablesOneByOne
> to trigger savepoint before meta infos synchronization finishes.
>
> {code:java}
> // step 2: execute insert and trigger savepoint with all tables added
> {
> // ..ingore
> waitForSinkSize("sink", fetchedDataList.size());
> Thread.sleep(500L);
> assertEqualsInAnyOrder(fetchedDataList,
> TestValuesTableFactory.getRawResults("sink"));
> finishedSavePointPath = triggerSavepointWithRetry(jobClient,
> savepointDirectory);
> jobClient.cancel().get();
> }
> // test removing table one by one, note that there should be at least one
> table remaining
> for (int round = 0; round < captureAddressTables.length - 1; round++) {
> ...
> }
> {code}
>
> * Add chunk-meta.group.size =2 in
> com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#getCreateTableStatement
> Then, run
> test(com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testJobManagerFailoverForRemoveTable),
> the error log will occur.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)