[ 
https://issues.apache.org/jira/browse/FLINK-39478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083160#comment-18083160
 ] 

Jubin Soni commented on FLINK-39478:
------------------------------------

[~vinaysagargonabavi] Thanks for the very thorough writeup - the analysis lines 
up with what I see on current {{{}master{}}}:
 * {{MySqlSnapshotSplitAssigner.checkpointIdToFinish}} is still {{@Nullable}} 
and is not threaded into {{{}SnapshotPendingSplitsState{}}}'s constructor in 
{{{}snapshotState(){}}}, so it is indeed lost across restore.
 * {{captureNewlyAddedTables()}} is still gated by 
{{{}AssignerStatus.isAssigningFinished(...){}}}, which only matches states 1 
and 4 — so a restart while in states 2 or 3 skips re-discovery as described.
 * The {{Thread.sleep(1000L)}} workaround in {{NewlyAddedTableITCase}} is still 
there, which is a pretty clear acknowledgment of the same fragility.

On the four suggested fixes, I'd suggest splitting the work:
 # *Persist {{checkpointIdToFinish}} in {{SnapshotPendingSplitsState}}* — this 
looks like the most tractable change and the most likely root cause of the 
non-deterministic "survived one restart, died on the second" behavior. It's a 
localized state addition plus a {{PendingSplitsStateSerializer}} version bump 
with a backward-compat read path for older snapshots.
 # *Expose {{assignerStatus}} and per-table snapshot progress as Flink metrics* 
— low-risk and independently valuable; would have made this issue diagnosable 
in the first place.
 # The idempotent 3→4 handshake rework is higher blast radius (touches the 
enumerator↔reader RPC protocol) and probably deserves its own ticket.
 # A "no progress for N checkpoints" watchdog is reasonable but needs careful 
threshold tuning given that legitimate snapshots on 100M+ row tables can 
legitimately go quiet for long stretches.

I'd be happy to pick up (1) and (2) as a first PR. The trickiest piece is a 
deterministic IT test, I'm thinking of injecting a restart at the precise 
window between {{snapshotState(), so the regression test fails today and passes 
after the state is persisted.}}

> Newly-added tables become ghost tables after job restart during initial 
> snapshot phase
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-39478
>                 URL: https://issues.apache.org/jira/browse/FLINK-39478
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>         Environment: * Flink CDC version: 3.4.0 (but also see on the latest)
>  * {{scan.newly-added-table.enabled: true}}
>  * Checkpoint interval: 5 minutes, exactly-once, RocksDB backend
>  * Parallelism: 4
>  * Restarts triggered by both external savepoint-cancel and pod 
> replacement/checkpoint-restore
>            Reporter: Vinay Sagar Gonabavi
>            Priority: Critical
>
> *Summary*
> When {{scan.newly-added-table.enabled=true}} and a job restarts 
> (savepoint-cancel or checkpoint-restore) while a newly-added table is 
> mid-snapshot, the table can silently stop emitting events. The table remains 
> in the pipeline config and the job runs healthily for all other tables, but 
> the affected table produces zero data indefinitely
> *Production Evidence*
> Two tables independently exhibited this behavior:
> ||Table||Rows||Snapshot duration||Paimon snapshots before death||Death 
> trigger||Data after restart||
> |{{table_1}}|100M+|multi-day|96 over 2 days|savepoint-cancel restart (Feb 
> 27)|zero|
> |{{table_2}}|100M+|multi-day|201 over 1.5 days|survived 1st restart, died on 
> 2nd|zero|
>  * Job continues running healthily for all other tables (confirmed by Paimon 
> counts and operator metrics)
>  * ~60K records still flowing are all from working tables
>  * Both tables confirmed mid-snapshot at time of restart via S3 data
> Notably, {{table_2}} survived one restart and died on the second. This 
> non-determinism points to a timing/race condition in the recovery path rather 
> than a guaranteed failure. The outcome depends on exactly which state the 
> assigner was in at the checkpoint boundary.
> *Root Cause Analysis*
> The Flink CDC MySQL source uses a finite state machine 
> ({{{}AssignerStatus{}}}) to manage newly-added table snapshots. The state 
> progression is:
>  
> {code:java}
>  INITIAL_ASSIGNING_FINISHED (1)
>   → NEWLY_ADDED_ASSIGNING (2)          // snapshot splits being assigned
>   → NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED (3)  // all splits done, awaiting 
> binlog update
>   → NEWLY_ADDED_ASSIGNING_FINISHED (4) // binlog handshake complete{code}
> We were unable to determine the exact failing code path due to limited 
> observability into the assigner state at the time of the restarts. However, 
> code analysis reveals that a restart during states 2 or 3 exposes multiple 
> fragile recovery windows, any of which could permanently stall the state 
> machine:
> *1. Transient {{checkpointIdToFinish}} field 
> (MySqlSnapshotSplitAssigner.java)*
> This field gates the {{NEWLY_ADDED_ASSIGNING → 
> NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED}} transition but is {*}not 
> checkpointed{*}. After restore it's {{{}null{}}}, requiring 2 additional 
> checkpoint cycles to re-derive (one {{snapshotState()}} to set it, one 
> {{notifyCheckpointComplete()}} to act on it). If the job restarts again 
> within those 2 cycles, the window resets. This could explain why {{table_2}} 
> survived one restart but not the second, the first restart may have landed on 
> a clean boundary, while the second hit before the 2-cycle recovery completed.
> *2. Multi-step binlog update handshake between states 3→4* 
> ({*}MySqlSourceEnumerator.java{*})
> The transition from state 3 to 4 requires: enumerator sends 
> {{BinlogSplitUpdateRequestEvent}} → reader suspends binlog → reader requests 
> latest finished splits number → enumerator responds → reader updates binlog 
> split → reader sends {{BinlogSplitUpdateAckEvent}} → 
> {{{}onBinlogSplitUpdated(){}}}. A restart during this handshake restores 
> state 3, and {{MySqlHybridSplitAssigner.getNext()}} returns 
> {{Optional.empty()}} for all split requests until 
> {{requestBinlogSplitUpdateIfNeed()}} re-triggers the handshake via 
> {{{}syncWithReaders(){}}}.
> *3. {{captureNewlyAddedTables()}} guard skips re-discovery* 
> ({*}MySqlSnapshotSplitAssigner.java{*})
> {code:java}
> if (... && AssignerStatus.isAssigningFinished(assignerStatus)) { {code}
> This only passes for states 1 and 4. If the restored state is 2 or 3, the 
> entire newly-added table re-discovery is skipped. The code assumes 
> in-progress work will resume from checkpoint state, but as described above, 
> the recovery path depends on transient fields and multi-step coordination 
> that may not complete before the next restart.
> *4. Existing test acknowledges this fragility* 
> ({*}NewlyAddedTableITCase.java{*}):
> {code:java}
> // sleep 1s to wait for the assign status to INITIAL_ASSIGNING_FINISHED.
> // Otherwise, the restart job won't read newly added tables, and this test 
> will be stuck.
> Thread.sleep(1000L); {code}
> The combination of transient fields, multi-step handshakes, and guarded 
> re-discovery creates multiple windows where a restart can permanently stall 
> the state machine. The non-deterministic behavior across our two affected 
> tables is consistent with this, the outcome depends on the exact assigner 
> state at the checkpoint boundary, which varies by timing.
> *Impact*
>  * Large tables (100M+ rows) are disproportionately affected because their 
> multi-day snapshot maximizes the window where a restart hits the vulnerable 
> state
>  * Silent failure - no errors logged, job appears healthy, only detectable by 
> monitoring per-table output
>  * Unrecoverable without dropping the table from the pipeline and re-adding 
> it (full re-snapshot)
>  
> *Suggested Fix Directions*
>  # Persist {{checkpointIdToFinish}} in {{SnapshotPendingSplitsState}} so the 
> state machine can advance immediately after restore without waiting for 2 
> additional checkpoint cycles
>  # Make the binlog update handshake (state 3→4) idempotent and automatically 
> re-triggerable on restore, rather than relying on {{syncWithReaders()}} to 
> eventually re-trigger it
>  # Add a safety mechanism: if {{assignerStatus}} is in state 2 or 3 and no 
> progress (split assignments or finished reports) occurs within N checkpoint 
> cycles, log a warning and consider resetting the table's snapshot
>  # Expose {{assignerStatus}} and per-table snapshot progress as JMX/Flink 
> metrics - the lack of observability into the assigner state made this 
> extremely difficult to diagnose and made it impossible to determine the exact 
> failing code path
> *Workaround*
> When adding large tables with {{{}scan.newly-added-table.enabled{}}}, avoid 
> restarting the job until the initial snapshot completes. If a restart occurs 
> mid-snapshot and the table becomes a ghost, remove the table from the 
> pipeline config, restart, then re-add it to force a fresh snapshot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to