[ 
https://issues.apache.org/jira/browse/FLINK-39775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18084258#comment-18084258
 ] 

Spoorthi Basu commented on FLINK-39775:
---------------------------------------

I have gone ahead and opened a PR here: 
[https://github.com/apache/flink-cdc/pull/4418]. Happy to adjust based on any 
feedback.

> Snapshot split metadata is not released after switching to stream phase, 
> causing large JobManager coordinator state and OOM with many snapshot splits
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39775
>                 URL: https://issues.apache.org/jira/browse/FLINK-39775
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: yuanfenghu
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Description
> We observed a JobManager OOM issue when running Flink CDC incremental 
> snapshot with a large number of snapshot splits.
> The job can finish the snapshot phase and switch to the stream/binlog phase 
> successfully. However, the snapshot split metadata appears to remain in the 
> source coordinator state after the stream split has been assigned. In 
> large-table scenarios, subsequent checkpoints continue to serialize the full 
> snapshot split state, which makes the JobManager memory usage stay high and 
> eventually causes OOM.
> Environment
>  - Flink CDC version: 3.5.0
>  - Flink version: 1.20.x
>  - Connector: MySQL CDC / incremental snapshot
>  - Sink: Paimon
>  - Startup mode: initial
>  - `scan.incremental.snapshot.chunk.size`: default `8096`
>  - Table scale: one table has about 2.5 billion rows, and the whole job 
> contains many tables
> With the default chunk size, one 2.5B-row table can generate roughly 300K 
> snapshot splits. When multiple large tables are included in one job, the 
> number of finished snapshot splits becomes very large.
> h2. Actual Behavior
> After all snapshot splits are finished and the stream/binlog split is 
> assigned:
> 1. JobManager memory usage remains high.
> 2. Source coordinator state remains very large.
> 3. Checkpoints continue to carry large snapshot split metadata.
> 4. The job may eventually fail with JobManager OOM.
> h2. Expected Behavior
> Snapshot split metadata should only be retained while it is still required 
> for correctness, for example before the stream split is created, transferred 
> to readers, and covered by a completed checkpoint.
> After the source has safely entered the stream phase, the coordinator should 
> not continue to retain and checkpoint all heavyweight snapshot split maps 
> such as assigned snapshot splits and finished split offsets.
>  # 
>  ## Code Analysis
> The issue seems related to the lifecycle of snapshot metadata in the split 
> assigner.
> In the generic source implementation:
>  - `SnapshotSplitAssigner` keeps the following structures in memory:
>   - `assignedSplits`
>   - `tableSchemas`
>   - `splitFinishedOffsets`
>   - `splitFinishedCheckpointIds`
>  - `onFinishedSplits()` appends finished split offsets into 
> `splitFinishedOffsets` and records checkpoint ids in 
> `splitFinishedCheckpointIds`.
>  - `snapshotState()` always creates `SnapshotPendingSplitsState` with the 
> full snapshot state:
>   - `alreadyProcessedTables`
>   - `remainingSplits`
>   - `assignedSplits`
>   - `tableSchemas`
>   - `splitFinishedOffsets`
>   - `splitFinishedCheckpointIds`
>   - `chunkSplitterState`
>  - `HybridSplitAssigner.snapshotState()` wraps this full snapshot state 
> inside `HybridPendingSplitsState`, even after the stream split has already 
> been assigned.
>  - `PendingSplitsStateSerializer.serializeHybridPendingSplitsState()` 
> serializes the full `SnapshotPendingSplitsState` first, then only writes the 
> `isStreamSplitAssigned` flag.
>  - `notifyCheckpointComplete()` removes entries from 
> `splitFinishedCheckpointIds`, but it does not remove the heavyweight metadata 
> in `assignedSplits` or `splitFinishedOffsets`.
> This means that after the snapshot phase is complete, large snapshot metadata 
> can still be retained in the live coordinator object and serialized into 
> later checkpoints.
> The same pattern also exists in the MySQL-specific legacy path:
>  - `MySqlSnapshotSplitAssigner`
>  - `MySqlHybridSplitAssigner`
>  - `PendingSplitsStateSerializer`
> For example, `MySqlHybridSplitAssigner.snapshotState()` still wraps 
> `snapshotSplitAssigner.snapshotState(checkpointId)` in 
> `HybridPendingSplitsState`, and the MySQL snapshot state serializer writes 
> `assignedSplits`, `splitFinishedOffsets`, and `tableSchemas`.
> h2. Direct Memory / Container OOM Risk
> There is another important factor that makes this issue easier to trigger in 
> containerized deployments.
> The snapshot split metadata is retained in the JobManager-side source 
> coordinator. During checkpointing, this large coordinator state needs to be 
> serialized and persisted/transferred. This may create multiple copies of the 
> same metadata in a short period of time, including:
>  - live Java objects in the coordinator, such as `assignedSplits` and 
> `splitFinishedOffsets`;
>  - serialized coordinator state bytes;
>  - direct/off-heap buffers used by RPC, network, or checkpoint storage 
> clients during state transfer.
> For JobManager, Flink does not enable the JVM direct memory limit by default. 
> The config `jobmanager.memory.enable-jvm-direct-memory-limit` defaults to 
> `false`. When it is disabled, Flink does not set `-XX:MaxDirectMemorySize` 
> for the JobManager process, so direct memory usage may not fail early with a 
> JVM-level `OutOfMemoryError: Direct buffer memory`.
> In containerized environments, the container memory limit still applies to 
> the whole process RSS. Therefore, a large coordinator state may increase 
> direct/off-heap memory during checkpoint serialization or transfer and 
> eventually cause the container to be OOMKilled, even if the Java heap itself 
> does not obviously exceed its limit.
> This makes the retained snapshot split metadata more dangerous: it is not 
> only a large heap/coordinator-state problem, but can also amplify JobManager 
> direct memory pressure during every checkpoint.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to