spoorthibasu opened a new pull request, #4418:
URL: https://github.com/apache/flink-cdc/pull/4418

   **TL;DR:** Fix JobManager OOM with a large number of snapshot splits by 
releasing the snapshot split metadata after the source enters the binlog phase, 
once the binlog split is assigned and covered by a completed checkpoint.
   
   ## Root Cause
   
   After the snapshot phase finishes and the binlog split is assigned, the 
source coordinator keeps the full snapshot split metadata indefinitely.
   
   - `MySqlSnapshotSplitAssigner` holds `assignedSplits`, 
`splitFinishedOffsets`, and `tableSchemas` in memory.
   - `snapshotState()` rebuilds the complete `SnapshotPendingSplitsState` on 
every checkpoint, `MySqlHybridSplitAssigner` wraps it, and 
`notifyCheckpointComplete()` never drops it.
   - `MySqlSourceEnumerator` keeps a second copy of the same finished-split 
metadata in its `binlogSplitMeta` cache.
   
   With many finished splits (for example ~300K for a 2.5B-row table at the 
default chunk size of 8096), every post-snapshot checkpoint re-serializes the 
entire snapshot state. This keeps JobManager heap high, and because the large 
coordinator state is copied during checkpoint serialization and transfer, it 
also drives up direct/off-heap memory and can OOM-kill the container even when 
the heap itself does not exceed its limit.
   
   ## Fix
   
   Release the snapshot split metadata once it is no longer needed for 
correctness: after the binlog split has been created, its finished-split 
metadata fully transferred to and assembled by the reader, and that is covered 
by a completed checkpoint.
   
   ### Reader-side (completion signal)
   
   - When a complete binlog split enters reading (whether its metadata arrived 
inline or was assembled over the divided meta-group requests), 
`MySqlSourceReader` sends a new `BinlogSplitMetaAssembledEvent` to the 
coordinator.
   - The event is idempotent: it is re-sent whenever a complete binlog split 
(re-)enters reading, including after a restore.
   
   ### Coordinator-side (gated release)
   
   - On receiving the event, `MySqlHybridSplitAssigner` records that the reader 
has assembled the metadata.
   - `snapshotState()` schedules the release at the current checkpoint id; the 
actual clearing happens in `notifyCheckpointComplete()` once that checkpoint 
completes.
   - `releaseSnapshotMetadata()` clears `assignedSplits`, 
`splitFinishedOffsets`, and `tableSchemas`. `alreadyProcessedTables`, the 
assigner status, and the chunk splitter state are kept so a restore does not 
re-discover tables.
   - `MySqlSourceEnumerator` clears its cached `binlogSplitMeta` at the same 
point.
   - Release is skipped entirely when `scan.newly-added-table.enabled` is set, 
since that flow may need the metadata again to extend the binlog split.
   
   ## Why the release is safe
   
   - The clearing happens only in `notifyCheckpointComplete()`, never in 
`snapshotState()`. Flink only re-delivers (`addSplitsBack`) split assignments 
that are not covered by the last completed checkpoint. Because the binlog split 
was assigned before the release-scheduling checkpoint, that assignment is 
checkpoint-covered by the time release runs, so the binlog split can never be 
added back into an emptied assigner. `createBinlogSplit()` also asserts the 
metadata has not been released, turning any future regression into a fast 
failure instead of silent data loss.
   - If the binlog split is added back before release (reader failover between 
scheduling and the checkpoint completing), the scheduled release is reset and 
the metadata is still intact, so the split is recreated normally.
   - On restore from a released ("light") state, the maps are empty, 
`isBinlogSplitAssigned` is true, and `alreadyProcessedTables` is non-empty, so 
the assigner does not re-discover tables and does not recreate the binlog 
split. The reader restores its own complete binlog split, requests no metadata, 
and re-sends the assembled event, which makes the release idempotent.
   - The state and serializer format are unchanged. A released state is a 
normal `SnapshotPendingSplitsState` with empty maps, so it serializes under the 
existing version and stays compatible on restore.
   
   ## Tests Added
   
   - **MySqlSnapshotMetadataReleaseTest** (new)
     Release after the assembled event plus a completed checkpoint; no release 
without the event; no release before the checkpoint completes; no release when 
newly-added-table is enabled; add-back resets the schedule and the binlog split 
is recreated with full metadata.
   
   - 
**PendingSplitsStateSerializerTest.testSerializeAndDeserializeReleasedHybridState**
     Round-trips the released ("light") hybrid state, confirming empty maps and 
`isBinlogSplitAssigned` survive serialization under the current version.
   
   - 
**MySqlSourceReaderTest.testReaderSendsBinlogSplitMetaAssembledEventForCompleteSplit**
     The reader emits the assembled event when a complete binlog split enters 
reading.
   
   - **MySqlSourceITCase**
     TaskManager and JobManager failover in the binlog phase, plus reads across 
multiple tables at higher parallelism, all pass with no data loss.
   
   ## Notes
   
   - The common path is unchanged until the source is well into the binlog 
phase; release only happens after the binlog split is assigned and one covering 
checkpoint completes.
   - No new configuration is introduced, and no state or serializer version 
bump is required.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to