Sergei Morozov created FLINK-38270: -------------------------------------- Summary: MySQL CDC source may ignore newly added tables while reading the binlog (scenario 2) Key: FLINK-38270 URL: https://issues.apache.org/jira/browse/FLINK-38270 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.2.0 Reporter: Sergei Morozov
The logic of removing finished snapshot split infos for no longer existing tables is inconsistent on the enumerator and the source reader. This may lead to a situation where the infos of an old table are retained on the reader but the infos of the new one aren’t transmitted. As a result, the binlog events for the newly table will be skipped. h3. Discrepancy in the info subtraction logic When a table is no longer captured by the connector, it’s removed from the enumerator’s and reader’s state. h4. Enumerator # Discover tables that match the include list. # Compare them with the tables from the state. # Remove the state that corresponds to the no longer captured tables. As a result: # If the table is no longer included into the configuration, it’s *removed* from the state. # If the table no longer exists, it is *also removed* from the state. h4. Reader # Iterate finished snapshot split infos. # Remove all infos whose tables are no longer included into the configuration. As a result: # If the table is no longer included into the configuration, it’s *removed* from the state. # If the table no longer exists, it _is *not*_ *removed* from the state, because the reader doesn’t know that the table no longer exists. h3. Impact of the discrepancy on the binlog split metadata transmission The transmission logic uses the number split infos on each side as the indicator of completion and relies on the fact that the no longer relevant infos are subtracted consistently. The fact that an info that’s subtracted from enumerator’s state may not be subtracted from the reader’s may lead to the fact that the info of a newly snapshotted table won’t be transmitted to the reader. h3. Steps to reproduce # Create a source connection that captures tables {{A}} and {{B}}. # Start the connection and wait until it reaches the steady state. # Stop the connection. # Drop table {{A}} in the source database. # Start the connection. Connectivity test should be disabled. # Observe the state of the enumerator and the reader ## The enumerator’s finished split infos will only contain B (A no longer exists, so it’s subtracted from the state). ## The reader’s finished split infos will contain A and B (both still match the include list). # Stop the connection. # Add a stream mapping for table C. Note that at this point the connection can be updated only via the CLI or the API. The UI won't allow you to keep the stream mapping for A. # Start the connection. Connectivity test still should be disabled. # Observe the state of the enumerator and the reader ## The enumerator’s finished split infos will contain B and C (the total number is 2). ## The reader’s finished split infos will contain A and B (both still match the include list, and the total number is also 2). # Make data changes in C and confirm that they are not captured. # Confirm that the logs contain the following message: > Following expected tables are not being read from binlog [<database-name>.c] h3. Observing state h4. Enumerator # Set a breakpoint on {{MySqlSourceEnumerator#snapshotState()}} # Evaluate {{((MySqlHybridSplitAssigner)splitAssigner).snapshotSplitAssigner.splitFinishedOffsets}} h4. Reader # Set a breakpoint on {{MySqlSplitReader#fetch()}} # Evaluate {{((MySqlBinlogSplit)((BinlogSplitReader)this.currentReader).currentBinlogSplit).finishedSnapshotSplitInfos}} -- This message was sent by Atlassian Jira (v8.20.10#820010)