liaorui opened a new pull request, #8364: URL: https://github.com/apache/inlong/pull/8364
…hot phase ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-XYZ][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes #8363 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* MySQL connector captures binlog in snapshot phase all the time even if tables have been removed in flink sql. For example, flink sql has captured table1, table2 and table3. Table1 has large scale data. MySQL connector chunk splitter is splitting table1 into many chunks, and reading binlog file by sequence. It will take a lot of time to finish reading table1. If we don't want to capture table1 and remove it from flink sql, we pause the flink job with checkpoint, then start the job again. Even if table1 don't exist in flink sql, MySQL connector still captures table1, as flink job restores states from last checkpoint and table1 is still in checkpoint. Maybe we need to do something when job is restoring from checkpoint. ### Modifications *Describe the modifications you've done.* `restoreEnumerator` method of class `MySqlSource` is the entrance where job restores its' states. `SnapshotPendingSplitsState` has recorded already processed tables, remaining tables and remaining splits. We get the captured table list from debezium and remove the discarding tables from remaining tables in `SnapshotPendingSplitsState`. We also remove the splits of discarding tables. But the last split will be still processed in `MySqlSnapshotSplitReadTask`, we must ignore this split. ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
