[ 
https://issues.apache.org/jira/browse/FLINK-38334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-38334:
-----------------------------------
    Labels: pull-request-available  (was: )

> MySQL CDC source may get stuck in the INITIAL_ASSIGNING state
> -------------------------------------------------------------
>
>                 Key: FLINK-38334
>                 URL: https://issues.apache.org/jira/browse/FLINK-38334
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.2.0
>            Reporter: Sergei Morozov
>            Priority: Major
>              Labels: pull-request-available
>
> The logic of filtering out irrelevant snapshot splits on the enumerator and 
> the source reader is inconsistent:
>  # The enumerator filters them out only if the assigner is in the 
> {{INITIAL_ASSIGNING_FINISHED}} or the {{NEWLY_ADDED_ASSIGNING_FINISHED}} 
> state 
> ([source|https://github.com/apache/flink-cdc/blob/release-3.2.0/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java#L219]).
>  # The source reader filters them out regardless of the assigner state.
> As a result, if a table is excluded from the source configuration before it 
> has been snapshotted, the source reader will drop its splits while the 
> enumerator will expect them to be processed by the source reader and reported 
> back. Eventually, the source gets stuck in the snapshotting phase and never 
> transitions to streaming.
> h5. Steps to reproduce
>  # Prepare tables {{A}} and {{B}} to be captured by the source. The number of 
> rows in {{B}} is irrelevant, table {{A}} needs to have enough rows for its 
> snapshot to take a couple of minutes (100 rows should be enough given the 
> configuration below).
>  # Make sure the source has the following configuration parameters:
>  ## {{{}scan.snapshot.fetch.size: 2{}}}. This will guarantee that the 
> snapshot of table {{A}} will take long enough ({{{}2{}}} is the minimum 
> allowed value).
>  # Include tables {{A}} and {{B}} into the source's configuration.
>  # Start the job and wait until it starts snapshotting {{{}A{}}}.
>  # Stop the job.
>  # Exclude table {{A}} from the configuration.
>  # Start the job.
>  # Observe that the source reader drops the splits that belong to {{A}} 
> (should log an "is skipping split" {{INFO}} message for each). At the same 
> time, the enumerator still expects them to be finished and reported.
>  # Make changes in table {{B}} (e.g. insert a row)
>  # Observe that this change never makes its way downstream.
> h5. Relevant log messages
> The enumerator splits tables into chunks:
> {noformat}
> [15:05:07.667] [INFO] 
> [org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner]
> Split table flink_38334.a into 50 chunks, time cost: 289ms.
> [15:05:07.818] [INFO] 
> [org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner]
> Split table flink_38334.b into 1 chunks, time cost: 150ms.
> {noformat}
> Table A splits get successfully processed:
> {noformat}
> [15:06:06.840] [INFO] 
> [org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader]
> Source reader 0 reports offsets of finished snapshot splits 
> {flink_38334.a:0={ts_sec=0, file=mysql-bin-changelog.144529, pos=157, 
> kind=SPECIFIC, gtids=, row=0, event=0}}.
> [15:06:06.844] [INFO] 
> [org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator]
> The enumerator under INITIAL_ASSIGNING receives finished split offsets 
> FinishedSnapshotSplitsReportEvent{finishedOffsets={flink_38334.a:0={ts_sec=0, 
> file=mysql-bin-changelog.144529, pos=157, kind=SPECIFIC, gtids=, row=0, 
> event=0}}} from subtask 0.
> [15:06:06.845] [INFO] 
> [org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator]
> Sending an acknowledgement of finished split offsets 
> FinishedSnapshotSplitsAckEvent{finishedSplits=[flink_38334.a:0]} to subtask 0.
> {noformat}
> After the source has been restarted with table {{A}} no longer included into 
> the configuration:
> {noformat}
> [15:08:56.426] [INFO] 
> [org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator]
> The enumerator assigns split MySqlSnapshotSplit{tableId=flink_38334.a, 
> splitId='flink_38334.a:9', splitKeyType=[`id` INT NOT NULL], splitStart=[19], 
> splitEnd=[21], highWatermark=null} to subtask 0
> [15:08:56.504] [INFO] 
> [org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader]
> Source reader 0 adds split MySqlSnapshotSplit{tableId=flink_38334.a, 
> splitId='flink_38334.a:9', splitKeyType=[`id` INT NOT NULL], splitStart=[19], 
> splitEnd=[21], highWatermark=null}
> [15:08:56.505] [INFO] 
> [org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader]
> The subtask 0 is skipping split flink_38334.a:9 because it does not match new 
> table filter.
> {noformat}
> Note that the following messages are never logged:
>  # "Snapshot split assigner received all splits finished"
>  # "Assigner status changes from INITIAL_ASSIGNING to 
> INITIAL_ASSIGNING_FINISHED"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to