spoorthibasu opened a new pull request, #4418:
URL: https://github.com/apache/flink-cdc/pull/4418
**TL;DR:** Fix JobManager OOM with a large number of snapshot splits by
releasing the snapshot split metadata after the source enters the binlog phase,
once the binlog split is assigned and covered by a completed checkpoint.
## Root Cause
After the snapshot phase finishes and the binlog split is assigned, the
source coordinator keeps the full snapshot split metadata indefinitely.
- `MySqlSnapshotSplitAssigner` holds `assignedSplits`,
`splitFinishedOffsets`, and `tableSchemas` in memory.
- `snapshotState()` rebuilds the complete `SnapshotPendingSplitsState` on
every checkpoint, `MySqlHybridSplitAssigner` wraps it, and
`notifyCheckpointComplete()` never drops it.
- `MySqlSourceEnumerator` keeps a second copy of the same finished-split
metadata in its `binlogSplitMeta` cache.
With many finished splits (for example ~300K for a 2.5B-row table at the
default chunk size of 8096), every post-snapshot checkpoint re-serializes the
entire snapshot state. This keeps JobManager heap high, and because the large
coordinator state is copied during checkpoint serialization and transfer, it
also drives up direct/off-heap memory and can OOM-kill the container even when
the heap itself does not exceed its limit.
## Fix
Release the snapshot split metadata once it is no longer needed for
correctness: after the binlog split has been created, its finished-split
metadata fully transferred to and assembled by the reader, and that is covered
by a completed checkpoint.
### Reader-side (completion signal)
- When a complete binlog split enters reading (whether its metadata arrived
inline or was assembled over the divided meta-group requests),
`MySqlSourceReader` sends a new `BinlogSplitMetaAssembledEvent` to the
coordinator.
- The event is idempotent: it is re-sent whenever a complete binlog split
(re-)enters reading, including after a restore.
### Coordinator-side (gated release)
- On receiving the event, `MySqlHybridSplitAssigner` records that the reader
has assembled the metadata.
- `snapshotState()` schedules the release at the current checkpoint id; the
actual clearing happens in `notifyCheckpointComplete()` once that checkpoint
completes.
- `releaseSnapshotMetadata()` clears `assignedSplits`,
`splitFinishedOffsets`, and `tableSchemas`. `alreadyProcessedTables`, the
assigner status, and the chunk splitter state are kept so a restore does not
re-discover tables.
- `MySqlSourceEnumerator` clears its cached `binlogSplitMeta` at the same
point.
- Release is skipped entirely when `scan.newly-added-table.enabled` is set,
since that flow may need the metadata again to extend the binlog split.
## Why the release is safe
- The clearing happens only in `notifyCheckpointComplete()`, never in
`snapshotState()`. Flink only re-delivers (`addSplitsBack`) split assignments
that are not covered by the last completed checkpoint. Because the binlog split
was assigned before the release-scheduling checkpoint, that assignment is
checkpoint-covered by the time release runs, so the binlog split can never be
added back into an emptied assigner. `createBinlogSplit()` also asserts the
metadata has not been released, turning any future regression into a fast
failure instead of silent data loss.
- If the binlog split is added back before release (reader failover between
scheduling and the checkpoint completing), the scheduled release is reset and
the metadata is still intact, so the split is recreated normally.
- On restore from a released ("light") state, the maps are empty,
`isBinlogSplitAssigned` is true, and `alreadyProcessedTables` is non-empty, so
the assigner does not re-discover tables and does not recreate the binlog
split. The reader restores its own complete binlog split, requests no metadata,
and re-sends the assembled event, which makes the release idempotent.
- The state and serializer format are unchanged. A released state is a
normal `SnapshotPendingSplitsState` with empty maps, so it serializes under the
existing version and stays compatible on restore.
## Tests Added
- **MySqlSnapshotMetadataReleaseTest** (new)
Release after the assembled event plus a completed checkpoint; no release
without the event; no release before the checkpoint completes; no release when
newly-added-table is enabled; add-back resets the schedule and the binlog split
is recreated with full metadata.
-
**PendingSplitsStateSerializerTest.testSerializeAndDeserializeReleasedHybridState**
Round-trips the released ("light") hybrid state, confirming empty maps and
`isBinlogSplitAssigned` survive serialization under the current version.
-
**MySqlSourceReaderTest.testReaderSendsBinlogSplitMetaAssembledEventForCompleteSplit**
The reader emits the assembled event when a complete binlog split enters
reading.
- **MySqlSourceITCase**
TaskManager and JobManager failover in the binlog phase, plus reads across
multiple tables at higher parallelism, all pass with no data loss.
## Notes
- The common path is unchanged until the source is well into the binlog
phase; release only happens after the binlog split is assigned and one covering
checkpoint completes.
- No new configuration is introduced, and no state or serializer version
bump is required.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]