[
https://issues.apache.org/jira/browse/FLINK-39775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18084258#comment-18084258
]
Spoorthi Basu commented on FLINK-39775:
---------------------------------------
I have gone ahead and opened a PR here:
[https://github.com/apache/flink-cdc/pull/4418]. Happy to adjust based on any
feedback.
> Snapshot split metadata is not released after switching to stream phase,
> causing large JobManager coordinator state and OOM with many snapshot splits
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39775
> URL: https://issues.apache.org/jira/browse/FLINK-39775
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Reporter: yuanfenghu
> Priority: Major
> Labels: pull-request-available
>
> h2. Description
> We observed a JobManager OOM issue when running Flink CDC incremental
> snapshot with a large number of snapshot splits.
> The job can finish the snapshot phase and switch to the stream/binlog phase
> successfully. However, the snapshot split metadata appears to remain in the
> source coordinator state after the stream split has been assigned. In
> large-table scenarios, subsequent checkpoints continue to serialize the full
> snapshot split state, which makes the JobManager memory usage stay high and
> eventually causes OOM.
> Environment
> - Flink CDC version: 3.5.0
> - Flink version: 1.20.x
> - Connector: MySQL CDC / incremental snapshot
> - Sink: Paimon
> - Startup mode: initial
> - `scan.incremental.snapshot.chunk.size`: default `8096`
> - Table scale: one table has about 2.5 billion rows, and the whole job
> contains many tables
> With the default chunk size, one 2.5B-row table can generate roughly 300K
> snapshot splits. When multiple large tables are included in one job, the
> number of finished snapshot splits becomes very large.
> h2. Actual Behavior
> After all snapshot splits are finished and the stream/binlog split is
> assigned:
> 1. JobManager memory usage remains high.
> 2. Source coordinator state remains very large.
> 3. Checkpoints continue to carry large snapshot split metadata.
> 4. The job may eventually fail with JobManager OOM.
> h2. Expected Behavior
> Snapshot split metadata should only be retained while it is still required
> for correctness, for example before the stream split is created, transferred
> to readers, and covered by a completed checkpoint.
> After the source has safely entered the stream phase, the coordinator should
> not continue to retain and checkpoint all heavyweight snapshot split maps
> such as assigned snapshot splits and finished split offsets.
> #
> ## Code Analysis
> The issue seems related to the lifecycle of snapshot metadata in the split
> assigner.
> In the generic source implementation:
> - `SnapshotSplitAssigner` keeps the following structures in memory:
> - `assignedSplits`
> - `tableSchemas`
> - `splitFinishedOffsets`
> - `splitFinishedCheckpointIds`
> - `onFinishedSplits()` appends finished split offsets into
> `splitFinishedOffsets` and records checkpoint ids in
> `splitFinishedCheckpointIds`.
> - `snapshotState()` always creates `SnapshotPendingSplitsState` with the
> full snapshot state:
> - `alreadyProcessedTables`
> - `remainingSplits`
> - `assignedSplits`
> - `tableSchemas`
> - `splitFinishedOffsets`
> - `splitFinishedCheckpointIds`
> - `chunkSplitterState`
> - `HybridSplitAssigner.snapshotState()` wraps this full snapshot state
> inside `HybridPendingSplitsState`, even after the stream split has already
> been assigned.
> - `PendingSplitsStateSerializer.serializeHybridPendingSplitsState()`
> serializes the full `SnapshotPendingSplitsState` first, then only writes the
> `isStreamSplitAssigned` flag.
> - `notifyCheckpointComplete()` removes entries from
> `splitFinishedCheckpointIds`, but it does not remove the heavyweight metadata
> in `assignedSplits` or `splitFinishedOffsets`.
> This means that after the snapshot phase is complete, large snapshot metadata
> can still be retained in the live coordinator object and serialized into
> later checkpoints.
> The same pattern also exists in the MySQL-specific legacy path:
> - `MySqlSnapshotSplitAssigner`
> - `MySqlHybridSplitAssigner`
> - `PendingSplitsStateSerializer`
> For example, `MySqlHybridSplitAssigner.snapshotState()` still wraps
> `snapshotSplitAssigner.snapshotState(checkpointId)` in
> `HybridPendingSplitsState`, and the MySQL snapshot state serializer writes
> `assignedSplits`, `splitFinishedOffsets`, and `tableSchemas`.
> h2. Direct Memory / Container OOM Risk
> There is another important factor that makes this issue easier to trigger in
> containerized deployments.
> The snapshot split metadata is retained in the JobManager-side source
> coordinator. During checkpointing, this large coordinator state needs to be
> serialized and persisted/transferred. This may create multiple copies of the
> same metadata in a short period of time, including:
> - live Java objects in the coordinator, such as `assignedSplits` and
> `splitFinishedOffsets`;
> - serialized coordinator state bytes;
> - direct/off-heap buffers used by RPC, network, or checkpoint storage
> clients during state transfer.
> For JobManager, Flink does not enable the JVM direct memory limit by default.
> The config `jobmanager.memory.enable-jvm-direct-memory-limit` defaults to
> `false`. When it is disabled, Flink does not set `-XX:MaxDirectMemorySize`
> for the JobManager process, so direct memory usage may not fail early with a
> JVM-level `OutOfMemoryError: Direct buffer memory`.
> In containerized environments, the container memory limit still applies to
> the whole process RSS. Therefore, a large coordinator state may increase
> direct/off-heap memory during checkpoint serialization or transfer and
> eventually cause the container to be OOMKilled, even if the Java heap itself
> does not obviously exceed its limit.
> This makes the retained snapshot split metadata more dangerous: it is not
> only a large heap/coordinator-state problem, but can also amplify JobManager
> direct memory pressure during every checkpoint.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)