yuanfenghu created FLINK-39775:
----------------------------------
Summary: Snapshot split metadata is not released after switching
to stream phase, causing large JobManager coordinator state and OOM with many
snapshot splits
Key: FLINK-39775
URL: https://issues.apache.org/jira/browse/FLINK-39775
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: cdc-3.5.0
Reporter: yuanfenghu
h2. Description
We observed a JobManager OOM issue when running Flink CDC incremental snapshot
with a large number of snapshot splits.
The job can finish the snapshot phase and switch to the stream/binlog phase
successfully. However, the snapshot split metadata appears to remain in the
source coordinator state after the stream split has been assigned. In
large-table scenarios, subsequent checkpoints continue to serialize the full
snapshot split state, which makes the JobManager memory usage stay high and
eventually causes OOM.
## Environment
- Flink CDC version: 3.5.0
- Flink version: 1.20.x
- Connector: MySQL CDC / incremental snapshot
- Sink: Paimon
- Startup mode: initial
- `scan.incremental.snapshot.chunk.size`: default `8096`
- Table scale: one table has about 2.5 billion rows, and the whole job contains
many tables
With the default chunk size, one 2.5B-row table can generate roughly 300K
snapshot splits. When multiple large tables are included in one job, the number
of finished snapshot splits becomes very large.
h2. Actual Behavior
After all snapshot splits are finished and the stream/binlog split is assigned:
1. JobManager memory usage remains high.
2. Source coordinator state remains very large.
3. Checkpoints continue to carry large snapshot split metadata.
4. The job may eventually fail with JobManager OOM.
h2. Expected Behavior
Snapshot split metadata should only be retained while it is still required for
correctness, for example before the stream split is created, transferred to
readers, and covered by a completed checkpoint.
After the source has safely entered the stream phase, the coordinator should
not continue to retain and checkpoint all heavyweight snapshot split maps such
as assigned snapshot splits and finished split offsets.
## Code Analysis
The issue seems related to the lifecycle of snapshot metadata in the split
assigner.
In the generic source implementation:
- `SnapshotSplitAssigner` keeps the following structures in memory:
- `assignedSplits`
- `tableSchemas`
- `splitFinishedOffsets`
- `splitFinishedCheckpointIds`
- `onFinishedSplits()` appends finished split offsets into
`splitFinishedOffsets` and records checkpoint ids in
`splitFinishedCheckpointIds`.
- `snapshotState()` always creates `SnapshotPendingSplitsState` with the full
snapshot state:
- `alreadyProcessedTables`
- `remainingSplits`
- `assignedSplits`
- `tableSchemas`
- `splitFinishedOffsets`
- `splitFinishedCheckpointIds`
- `chunkSplitterState`
- `HybridSplitAssigner.snapshotState()` wraps this full snapshot state inside
`HybridPendingSplitsState`, even after the stream split has already been
assigned.
- `PendingSplitsStateSerializer.serializeHybridPendingSplitsState()` serializes
the full `SnapshotPendingSplitsState` first, then only writes the
`isStreamSplitAssigned` flag.
- `notifyCheckpointComplete()` removes entries from
`splitFinishedCheckpointIds`, but it does not remove the heavyweight metadata
in `assignedSplits` or `splitFinishedOffsets`.
This means that after the snapshot phase is complete, large snapshot metadata
can still be retained in the live coordinator object and serialized into later
checkpoints.
The same pattern also exists in the MySQL-specific legacy path:
- `MySqlSnapshotSplitAssigner`
- `MySqlHybridSplitAssigner`
- `PendingSplitsStateSerializer`
For example, `MySqlHybridSplitAssigner.snapshotState()` still wraps
`snapshotSplitAssigner.snapshotState(checkpointId)` in
`HybridPendingSplitsState`, and the MySQL snapshot state serializer writes
`assignedSplits`, `splitFinishedOffsets`, and `tableSchemas`.
h2. Direct Memory / Container OOM Risk
There is another important factor that makes this issue easier to trigger in
containerized deployments.
The snapshot split metadata is retained in the JobManager-side source
coordinator. During checkpointing, this large coordinator state needs to be
serialized and persisted/transferred. This may create multiple copies of the
same metadata in a short period of time, including:
- live Java objects in the coordinator, such as `assignedSplits` and
`splitFinishedOffsets`;
- serialized coordinator state bytes;
- direct/off-heap buffers used by RPC, network, or checkpoint storage clients
during state transfer.
For JobManager, Flink does not enable the JVM direct memory limit by default.
The config `jobmanager.memory.enable-jvm-direct-memory-limit` defaults to
`false`. When it is disabled, Flink does not set `-XX:MaxDirectMemorySize` for
the JobManager process, so direct memory usage may not fail early with a
JVM-level `OutOfMemoryError: Direct buffer memory`.
In containerized environments, the container memory limit still applies to the
whole process RSS. Therefore, a large coordinator state may increase
direct/off-heap memory during checkpoint serialization or transfer and
eventually cause the container to be OOMKilled, even if the Java heap itself
does not obviously exceed its limit.
This makes the retained snapshot split metadata more dangerous: it is not only
a large heap/coordinator-state problem, but can also amplify JobManager direct
memory pressure during every checkpoint.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)