[
https://issues.apache.org/jira/browse/FLINK-39478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083162#comment-18083162
]
Jubin Soni edited comment on FLINK-39478 at 5/24/26 3:27 PM:
-------------------------------------------------------------
I have created a PR that addresses fixes 1 and 2 suggested above:
https://github.com/apache/flink-cdc/pull/4410
was (Author: JIRAUSER313426):
I have created a PR that addresses fixes 1 and 2 suggested above:
https://github.com/apache/flink/pull/28241
> Newly-added tables become ghost tables after job restart during initial
> snapshot phase
> --------------------------------------------------------------------------------------
>
> Key: FLINK-39478
> URL: https://issues.apache.org/jira/browse/FLINK-39478
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Environment: * Flink CDC version: 3.4.0 (but also see on the latest)
> * {{scan.newly-added-table.enabled: true}}
> * Checkpoint interval: 5 minutes, exactly-once, RocksDB backend
> * Parallelism: 4
> * Restarts triggered by both external savepoint-cancel and pod
> replacement/checkpoint-restore
> Reporter: Vinay Sagar Gonabavi
> Priority: Critical
> Labels: pull-request-available
>
> *Summary*
> When {{scan.newly-added-table.enabled=true}} and a job restarts
> (savepoint-cancel or checkpoint-restore) while a newly-added table is
> mid-snapshot, the table can silently stop emitting events. The table remains
> in the pipeline config and the job runs healthily for all other tables, but
> the affected table produces zero data indefinitely
> *Production Evidence*
> Two tables independently exhibited this behavior:
> ||Table||Rows||Snapshot duration||Paimon snapshots before death||Death
> trigger||Data after restart||
> |{{table_1}}|100M+|multi-day|96 over 2 days|savepoint-cancel restart (Feb
> 27)|zero|
> |{{table_2}}|100M+|multi-day|201 over 1.5 days|survived 1st restart, died on
> 2nd|zero|
> * Job continues running healthily for all other tables (confirmed by Paimon
> counts and operator metrics)
> * ~60K records still flowing are all from working tables
> * Both tables confirmed mid-snapshot at time of restart via S3 data
> Notably, {{table_2}} survived one restart and died on the second. This
> non-determinism points to a timing/race condition in the recovery path rather
> than a guaranteed failure. The outcome depends on exactly which state the
> assigner was in at the checkpoint boundary.
> *Root Cause Analysis*
> The Flink CDC MySQL source uses a finite state machine
> ({{{}AssignerStatus{}}}) to manage newly-added table snapshots. The state
> progression is:
>
> {code:java}
> INITIAL_ASSIGNING_FINISHED (1)
> → NEWLY_ADDED_ASSIGNING (2) // snapshot splits being assigned
> → NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED (3) // all splits done, awaiting
> binlog update
> → NEWLY_ADDED_ASSIGNING_FINISHED (4) // binlog handshake complete{code}
> We were unable to determine the exact failing code path due to limited
> observability into the assigner state at the time of the restarts. However,
> code analysis reveals that a restart during states 2 or 3 exposes multiple
> fragile recovery windows, any of which could permanently stall the state
> machine:
> *1. Transient {{checkpointIdToFinish}} field
> (MySqlSnapshotSplitAssigner.java)*
> This field gates the {{NEWLY_ADDED_ASSIGNING →
> NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED}} transition but is {*}not
> checkpointed{*}. After restore it's {{{}null{}}}, requiring 2 additional
> checkpoint cycles to re-derive (one {{snapshotState()}} to set it, one
> {{notifyCheckpointComplete()}} to act on it). If the job restarts again
> within those 2 cycles, the window resets. This could explain why {{table_2}}
> survived one restart but not the second, the first restart may have landed on
> a clean boundary, while the second hit before the 2-cycle recovery completed.
> *2. Multi-step binlog update handshake between states 3→4*
> ({*}MySqlSourceEnumerator.java{*})
> The transition from state 3 to 4 requires: enumerator sends
> {{BinlogSplitUpdateRequestEvent}} → reader suspends binlog → reader requests
> latest finished splits number → enumerator responds → reader updates binlog
> split → reader sends {{BinlogSplitUpdateAckEvent}} →
> {{{}onBinlogSplitUpdated(){}}}. A restart during this handshake restores
> state 3, and {{MySqlHybridSplitAssigner.getNext()}} returns
> {{Optional.empty()}} for all split requests until
> {{requestBinlogSplitUpdateIfNeed()}} re-triggers the handshake via
> {{{}syncWithReaders(){}}}.
> *3. {{captureNewlyAddedTables()}} guard skips re-discovery*
> ({*}MySqlSnapshotSplitAssigner.java{*})
> {code:java}
> if (... && AssignerStatus.isAssigningFinished(assignerStatus)) { {code}
> This only passes for states 1 and 4. If the restored state is 2 or 3, the
> entire newly-added table re-discovery is skipped. The code assumes
> in-progress work will resume from checkpoint state, but as described above,
> the recovery path depends on transient fields and multi-step coordination
> that may not complete before the next restart.
> *4. Existing test acknowledges this fragility*
> ({*}NewlyAddedTableITCase.java{*}):
> {code:java}
> // sleep 1s to wait for the assign status to INITIAL_ASSIGNING_FINISHED.
> // Otherwise, the restart job won't read newly added tables, and this test
> will be stuck.
> Thread.sleep(1000L); {code}
> The combination of transient fields, multi-step handshakes, and guarded
> re-discovery creates multiple windows where a restart can permanently stall
> the state machine. The non-deterministic behavior across our two affected
> tables is consistent with this, the outcome depends on the exact assigner
> state at the checkpoint boundary, which varies by timing.
> *Impact*
> * Large tables (100M+ rows) are disproportionately affected because their
> multi-day snapshot maximizes the window where a restart hits the vulnerable
> state
> * Silent failure - no errors logged, job appears healthy, only detectable by
> monitoring per-table output
> * Unrecoverable without dropping the table from the pipeline and re-adding
> it (full re-snapshot)
>
> *Suggested Fix Directions*
> # Persist {{checkpointIdToFinish}} in {{SnapshotPendingSplitsState}} so the
> state machine can advance immediately after restore without waiting for 2
> additional checkpoint cycles
> # Make the binlog update handshake (state 3→4) idempotent and automatically
> re-triggerable on restore, rather than relying on {{syncWithReaders()}} to
> eventually re-trigger it
> # Add a safety mechanism: if {{assignerStatus}} is in state 2 or 3 and no
> progress (split assignments or finished reports) occurs within N checkpoint
> cycles, log a warning and consider resetting the table's snapshot
> # Expose {{assignerStatus}} and per-table snapshot progress as JMX/Flink
> metrics - the lack of observability into the assigner state made this
> extremely difficult to diagnose and made it impossible to determine the exact
> failing code path
> *Workaround*
> When adding large tables with {{{}scan.newly-added-table.enabled{}}}, avoid
> restarting the job until the initial snapshot completes. If a restart occurs
> mid-snapshot and the table becomes a ghost, remove the table from the
> pipeline config, restart, then re-add it to force a fresh snapshot.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)