[
https://issues.apache.org/jira/browse/FLINK-38334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-38334:
-----------------------------------
Labels: pull-request-available (was: )
> MySQL CDC source may get stuck in the INITIAL_ASSIGNING state
> -------------------------------------------------------------
>
> Key: FLINK-38334
> URL: https://issues.apache.org/jira/browse/FLINK-38334
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.2.0
> Reporter: Sergei Morozov
> Priority: Major
> Labels: pull-request-available
>
> The logic of filtering out irrelevant snapshot splits on the enumerator and
> the source reader is inconsistent:
> # The enumerator filters them out only if the assigner is in the
> {{INITIAL_ASSIGNING_FINISHED}} or the {{NEWLY_ADDED_ASSIGNING_FINISHED}}
> state
> ([source|https://github.com/apache/flink-cdc/blob/release-3.2.0/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java#L219]).
> # The source reader filters them out regardless of the assigner state.
> As a result, if a table is excluded from the source configuration before it
> has been snapshotted, the source reader will drop its splits while the
> enumerator will expect them to be processed by the source reader and reported
> back. Eventually, the source gets stuck in the snapshotting phase and never
> transitions to streaming.
> h5. Steps to reproduce
> # Prepare tables {{A}} and {{B}} to be captured by the source. The number of
> rows in {{B}} is irrelevant, table {{A}} needs to have enough rows for its
> snapshot to take a couple of minutes (100 rows should be enough given the
> configuration below).
> # Make sure the source has the following configuration parameters:
> ## {{{}scan.snapshot.fetch.size: 2{}}}. This will guarantee that the
> snapshot of table {{A}} will take long enough ({{{}2{}}} is the minimum
> allowed value).
> # Include tables {{A}} and {{B}} into the source's configuration.
> # Start the job and wait until it starts snapshotting {{{}A{}}}.
> # Stop the job.
> # Exclude table {{A}} from the configuration.
> # Start the job.
> # Observe that the source reader drops the splits that belong to {{A}}
> (should log an "is skipping split" {{INFO}} message for each). At the same
> time, the enumerator still expects them to be finished and reported.
> # Make changes in table {{B}} (e.g. insert a row)
> # Observe that this change never makes its way downstream.
> h5. Relevant log messages
> The enumerator splits tables into chunks:
> {noformat}
> [15:05:07.667] [INFO]
> [org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner]
> Split table flink_38334.a into 50 chunks, time cost: 289ms.
> [15:05:07.818] [INFO]
> [org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner]
> Split table flink_38334.b into 1 chunks, time cost: 150ms.
> {noformat}
> Table A splits get successfully processed:
> {noformat}
> [15:06:06.840] [INFO]
> [org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader]
> Source reader 0 reports offsets of finished snapshot splits
> {flink_38334.a:0={ts_sec=0, file=mysql-bin-changelog.144529, pos=157,
> kind=SPECIFIC, gtids=, row=0, event=0}}.
> [15:06:06.844] [INFO]
> [org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator]
> The enumerator under INITIAL_ASSIGNING receives finished split offsets
> FinishedSnapshotSplitsReportEvent{finishedOffsets={flink_38334.a:0={ts_sec=0,
> file=mysql-bin-changelog.144529, pos=157, kind=SPECIFIC, gtids=, row=0,
> event=0}}} from subtask 0.
> [15:06:06.845] [INFO]
> [org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator]
> Sending an acknowledgement of finished split offsets
> FinishedSnapshotSplitsAckEvent{finishedSplits=[flink_38334.a:0]} to subtask 0.
> {noformat}
> After the source has been restarted with table {{A}} no longer included into
> the configuration:
> {noformat}
> [15:08:56.426] [INFO]
> [org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator]
> The enumerator assigns split MySqlSnapshotSplit{tableId=flink_38334.a,
> splitId='flink_38334.a:9', splitKeyType=[`id` INT NOT NULL], splitStart=[19],
> splitEnd=[21], highWatermark=null} to subtask 0
> [15:08:56.504] [INFO]
> [org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader]
> Source reader 0 adds split MySqlSnapshotSplit{tableId=flink_38334.a,
> splitId='flink_38334.a:9', splitKeyType=[`id` INT NOT NULL], splitStart=[19],
> splitEnd=[21], highWatermark=null}
> [15:08:56.505] [INFO]
> [org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader]
> The subtask 0 is skipping split flink_38334.a:9 because it does not match new
> table filter.
> {noformat}
> Note that the following messages are never logged:
> # "Snapshot split assigner received all splits finished"
> # "Assigner status changes from INITIAL_ASSIGNING to
> INITIAL_ASSIGNING_FINISHED"
--
This message was sent by Atlassian Jira
(v8.20.10#820010)