loserwang1024 opened a new pull request, #3134:
URL: https://github.com/apache/flink-cdc/pull/3134

   ### What's the problem
   Once, I removed a table from the option and then restarted the job from the 
savepoint, but the job couldn't read the binlog anymore. When I checked the 
logs, I found an Error level log stating: ' The enumerator received invalid 
request meta group id 6, the valid meta group id range is [0, 4].'
   
   It appears that the Reader is requesting more splits than the Enumerator is 
aware of.
   However, the code should indeed remove redundant split information from the 
Reader as seen in https://github.com/ververica/flink-cdc-connectors/pull/2292. 
So why does this issue occur?
   
   ### why occurs
   
![image](https://github.com/apache/flink-cdc/assets/125648852/ab12311f-94a4-4e2d-aff4-02891ad1e668)
   Upon examining the code, I discovered the cause. If the job stops before 
completing all the split meta information and then restarts, this issue occurs. 
Suppose that the totalFinishedSplitSize of binlogSplit in the Reader is 6, and 
no meta information has been synchronized, leaving the 
finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After 
restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 - 
(0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed 
table have two split). This could lead to an out-of-range request.
   
   ### How to reproduce
   
   -  Add Thread.sleep(1000L) in 
com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#handleSourceEvents
 to postpone split meta infos synchronization.
   
   ```java
   public void handleSourceEvents(SourceEvent sourceEvent) {
   else if (sourceEvent instanceof BinlogSplitMetaEvent) {
       LOG.debug(
               "Source reader {} receives binlog meta with group id {}.",
               subtaskId,
               ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
       try {
           Thread.sleep(1000L);
       } catch (InterruptedException e) {
           throw new RuntimeException(e);
       }
       fillMetadataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
   } 
   ```
   
   - Add Thread.sleep(500L) in 
com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testRemoveTablesOneByOne
 to trigger savepoint before meta infos synchronization finishes.
   ```java
   // step 2: execute insert and trigger savepoint with all tables added
   {
       // ..ingore 
   
       waitForSinkSize("sink", fetchedDataList.size());
       Thread.sleep(500L);
       assertEqualsInAnyOrder(fetchedDataList, 
TestValuesTableFactory.getRawResults("sink"));
       finishedSavePointPath = triggerSavepointWithRetry(jobClient, 
savepointDirectory);
       jobClient.cancel().get();
   }
   
   // test removing table one by one, note that there should be at least one 
table remaining
   for (int round = 0; round < captureAddressTables.length - 1; round++) {
   ...
   }
   ```
   
   - Add chunk-meta.group.sizeĀ  =2 in 
com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#getCreateTableStatement
   
   Finally, run 
test(com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testJobManagerFailoverForRemoveTable),
 the error log will occur.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to