[ https://issues.apache.org/jira/browse/FLINK-38270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18018213#comment-18018213 ]
Sergei Morozov commented on FLINK-38270: ---------------------------------------- {quote}Each time after restart, the splits are reassigned by enumerator, then all the information can be same. {quote} As I understand, this is the key improvement this FLIP will provide. The state of the incremental replication won't be preserved between the restarts. Instead, the state will be replicated from scratch every time. I think this indeed would fix the issue but achieving this behavior doesn't strictly require this FLIP. In my implementation, once restored from the state, the reader will provide the checksum of the metadata to the enumerator, and if it doesn't match, the replication will start over. If there is a match, the replication may proceed without discarding the state. I agree, this protocol could be greatly simplified once this FLIP is accepted, but I don't think it's a hard dependency. The issue at hand can be fixed within the existing framework. > MySQL CDC source may ignore newly added tables while reading the binlog > (scenario 2) > ------------------------------------------------------------------------------------ > > Key: FLINK-38270 > URL: https://issues.apache.org/jira/browse/FLINK-38270 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: cdc-3.2.0 > Reporter: Sergei Morozov > Priority: Major > > The logic of removing finished snapshot split infos for no longer existing > tables is inconsistent on the enumerator and the source reader. This may lead > to a situation where the infos of an old table are retained on the reader but > the infos of the new one aren’t replicated. As a result, the binlog events > for the newly table will be skipped. > h3. Discrepancy in the info subtraction logic > When a table is no longer captured by the connector, it’s removed from the > enumerator’s and reader’s state. > h4. Enumerator > # Discover tables that match the include list. > # Compare them with the tables from the state. > # Remove the state that corresponds to the no longer captured tables. > As a result: > # If the table is no longer included into the configuration, it’s *removed* > from the state. > # If the table no longer exists, it is *also removed* from the state. > h4. Reader > # Iterate finished snapshot split infos. > # Remove all infos whose tables are no longer included into the > configuration. > As a result: > # If the table is no longer included into the configuration, it’s *removed* > from the state. > # If the table no longer exists, it _is *not*_ *removed* from the state, > because the reader doesn’t know that the table no longer exists. > h3. Impact of the discrepancy on the binlog split metadata replication > The replication logic uses the number of split infos on each side as the > indicator of completion and relies on the fact that the no longer relevant > infos are subtracted consistently on both sides. The fact that an info that’s > subtracted from enumerator’s state may not be subtracted from the reader’s > may lead to the fact that the info of a newly snapshotted table won’t be > replicated to the reader. > h3. Steps to reproduce > # Create a source connection that captures tables {{A}} and {{{}B{}}}. > # Start the connection and wait until it reaches the steady state. > # Stop the connection. > # Drop table {{A}} in the source database. > # Start the connection. > # Observe the state of the enumerator and the reader > ## The enumerator’s finished split infos will only contain B (A no longer > exists, so it’s subtracted from the state). > ## The reader’s finished split infos will contain A and B (both still match > the include list). > # Stop the connection. > # Add table C to the source configuration. > # Start the connection. > # Observe the state of the enumerator and the reader > ## The enumerator’s finished split infos will contain B and C (the total > number is 2). > ## The reader’s finished split infos will contain A and B (both still match > the include list, and the total number is also 2). > # Make data changes in C and confirm that they are not captured. > h3. Observing state > h4. Enumerator > # Set a breakpoint on {{MySqlSourceEnumerator#snapshotState()}} > # Evaluate > {{((MySqlHybridSplitAssigner)splitAssigner).snapshotSplitAssigner.splitFinishedOffsets}} > h4. Reader > # Set a breakpoint on {{MySqlSplitReader#fetch()}} > # Evaluate > {{((MySqlBinlogSplit)((BinlogSplitReader)this.currentReader).currentBinlogSplit).finishedSnapshotSplitInfos}} -- This message was sent by Atlassian Jira (v8.20.10#820010)