[ 
https://issues.apache.org/jira/browse/FLINK-39775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18084255#comment-18084255
 ] 

Spoorthi Basu commented on FLINK-39775:
---------------------------------------

Hi, I have investigated this issue and identified the root cause.

After the snapshot phase finishes and the stream (binlog) split is assigned, 
the source coordinator keeps retaining the full snapshot split metadata. The 
split assigner holds the finished snapshot split maps in memory, 
{{snapshotState()}} rebuilds the complete {{SnapshotPendingSplitsState}} on 
every checkpoint, and {{notifyCheckpointComplete()}} never drops them. So the 
metadata stays live in the coordinator and gets re-serialized into every later 
checkpoint. With many finished splits (for example ~300K for a 2.5B-row table 
at the default chunk size), each post-snapshot checkpoint re-serializes the 
entire snapshot state. That keeps JobManager heap high, and since the large 
coordinator state is copied during checkpoint serialization and transfer, it 
also drives up direct/off-heap memory and can OOM-kill the container even when 
the heap itself looks fine.

I have a fix that releases this metadata once it is no longer needed for 
correctness, that is, after the binlog split has been created, handed to the 
readers, and covered by a completed checkpoint. It preserves the current 
failover and restore behavior and does not change the state or serializer 
format. I implemented and validated it locally, including TaskManager and 
JobManager failover during the binlog phase.

If this approach looks good, I would appreciate being assigned to this ticket 
so I can open a PR.

> Snapshot split metadata is not released after switching to stream phase, 
> causing large JobManager coordinator state and OOM with many snapshot splits
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39775
>                 URL: https://issues.apache.org/jira/browse/FLINK-39775
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: yuanfenghu
>            Priority: Major
>
> h2. Description
> We observed a JobManager OOM issue when running Flink CDC incremental 
> snapshot with a large number of snapshot splits.
> The job can finish the snapshot phase and switch to the stream/binlog phase 
> successfully. However, the snapshot split metadata appears to remain in the 
> source coordinator state after the stream split has been assigned. In 
> large-table scenarios, subsequent checkpoints continue to serialize the full 
> snapshot split state, which makes the JobManager memory usage stay high and 
> eventually causes OOM.
> Environment
>  - Flink CDC version: 3.5.0
>  - Flink version: 1.20.x
>  - Connector: MySQL CDC / incremental snapshot
>  - Sink: Paimon
>  - Startup mode: initial
>  - `scan.incremental.snapshot.chunk.size`: default `8096`
>  - Table scale: one table has about 2.5 billion rows, and the whole job 
> contains many tables
> With the default chunk size, one 2.5B-row table can generate roughly 300K 
> snapshot splits. When multiple large tables are included in one job, the 
> number of finished snapshot splits becomes very large.
> h2. Actual Behavior
> After all snapshot splits are finished and the stream/binlog split is 
> assigned:
> 1. JobManager memory usage remains high.
> 2. Source coordinator state remains very large.
> 3. Checkpoints continue to carry large snapshot split metadata.
> 4. The job may eventually fail with JobManager OOM.
> h2. Expected Behavior
> Snapshot split metadata should only be retained while it is still required 
> for correctness, for example before the stream split is created, transferred 
> to readers, and covered by a completed checkpoint.
> After the source has safely entered the stream phase, the coordinator should 
> not continue to retain and checkpoint all heavyweight snapshot split maps 
> such as assigned snapshot splits and finished split offsets.
>  # 
>  ## Code Analysis
> The issue seems related to the lifecycle of snapshot metadata in the split 
> assigner.
> In the generic source implementation:
>  - `SnapshotSplitAssigner` keeps the following structures in memory:
>   - `assignedSplits`
>   - `tableSchemas`
>   - `splitFinishedOffsets`
>   - `splitFinishedCheckpointIds`
>  - `onFinishedSplits()` appends finished split offsets into 
> `splitFinishedOffsets` and records checkpoint ids in 
> `splitFinishedCheckpointIds`.
>  - `snapshotState()` always creates `SnapshotPendingSplitsState` with the 
> full snapshot state:
>   - `alreadyProcessedTables`
>   - `remainingSplits`
>   - `assignedSplits`
>   - `tableSchemas`
>   - `splitFinishedOffsets`
>   - `splitFinishedCheckpointIds`
>   - `chunkSplitterState`
>  - `HybridSplitAssigner.snapshotState()` wraps this full snapshot state 
> inside `HybridPendingSplitsState`, even after the stream split has already 
> been assigned.
>  - `PendingSplitsStateSerializer.serializeHybridPendingSplitsState()` 
> serializes the full `SnapshotPendingSplitsState` first, then only writes the 
> `isStreamSplitAssigned` flag.
>  - `notifyCheckpointComplete()` removes entries from 
> `splitFinishedCheckpointIds`, but it does not remove the heavyweight metadata 
> in `assignedSplits` or `splitFinishedOffsets`.
> This means that after the snapshot phase is complete, large snapshot metadata 
> can still be retained in the live coordinator object and serialized into 
> later checkpoints.
> The same pattern also exists in the MySQL-specific legacy path:
>  - `MySqlSnapshotSplitAssigner`
>  - `MySqlHybridSplitAssigner`
>  - `PendingSplitsStateSerializer`
> For example, `MySqlHybridSplitAssigner.snapshotState()` still wraps 
> `snapshotSplitAssigner.snapshotState(checkpointId)` in 
> `HybridPendingSplitsState`, and the MySQL snapshot state serializer writes 
> `assignedSplits`, `splitFinishedOffsets`, and `tableSchemas`.
> h2. Direct Memory / Container OOM Risk
> There is another important factor that makes this issue easier to trigger in 
> containerized deployments.
> The snapshot split metadata is retained in the JobManager-side source 
> coordinator. During checkpointing, this large coordinator state needs to be 
> serialized and persisted/transferred. This may create multiple copies of the 
> same metadata in a short period of time, including:
>  - live Java objects in the coordinator, such as `assignedSplits` and 
> `splitFinishedOffsets`;
>  - serialized coordinator state bytes;
>  - direct/off-heap buffers used by RPC, network, or checkpoint storage 
> clients during state transfer.
> For JobManager, Flink does not enable the JVM direct memory limit by default. 
> The config `jobmanager.memory.enable-jvm-direct-memory-limit` defaults to 
> `false`. When it is disabled, Flink does not set `-XX:MaxDirectMemorySize` 
> for the JobManager process, so direct memory usage may not fail early with a 
> JVM-level `OutOfMemoryError: Direct buffer memory`.
> In containerized environments, the container memory limit still applies to 
> the whole process RSS. Therefore, a large coordinator state may increase 
> direct/off-heap memory during checkpoint serialization or transfer and 
> eventually cause the container to be OOMKilled, even if the Java heap itself 
> does not obviously exceed its limit.
> This makes the retained snapshot split metadata more dangerous: it is not 
> only a large heap/coordinator-state problem, but can also amplify JobManager 
> direct memory pressure during every checkpoint.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to