yuanfenghu created FLINK-39775:
----------------------------------

             Summary: Snapshot split metadata is not released after switching 
to stream phase, causing large JobManager coordinator state and OOM with many 
snapshot splits
                 Key: FLINK-39775
                 URL: https://issues.apache.org/jira/browse/FLINK-39775
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.5.0
            Reporter: yuanfenghu


h2. Description

We observed a JobManager OOM issue when running Flink CDC incremental snapshot 
with a large number of snapshot splits.

The job can finish the snapshot phase and switch to the stream/binlog phase 
successfully. However, the snapshot split metadata appears to remain in the 
source coordinator state after the stream split has been assigned. In 
large-table scenarios, subsequent checkpoints continue to serialize the full 
snapshot split state, which makes the JobManager memory usage stay high and 
eventually causes OOM.

## Environment

- Flink CDC version: 3.5.0
- Flink version: 1.20.x
- Connector: MySQL CDC / incremental snapshot
- Sink: Paimon
- Startup mode: initial
- `scan.incremental.snapshot.chunk.size`: default `8096`
- Table scale: one table has about 2.5 billion rows, and the whole job contains 
many tables

With the default chunk size, one 2.5B-row table can generate roughly 300K 
snapshot splits. When multiple large tables are included in one job, the number 
of finished snapshot splits becomes very large.
h2. Actual Behavior

After all snapshot splits are finished and the stream/binlog split is assigned:

1. JobManager memory usage remains high.
2. Source coordinator state remains very large.
3. Checkpoints continue to carry large snapshot split metadata.
4. The job may eventually fail with JobManager OOM.
h2. Expected Behavior

Snapshot split metadata should only be retained while it is still required for 
correctness, for example before the stream split is created, transferred to 
readers, and covered by a completed checkpoint.

After the source has safely entered the stream phase, the coordinator should 
not continue to retain and checkpoint all heavyweight snapshot split maps such 
as assigned snapshot splits and finished split offsets.

## Code Analysis

The issue seems related to the lifecycle of snapshot metadata in the split 
assigner.

In the generic source implementation:

- `SnapshotSplitAssigner` keeps the following structures in memory:

  - `assignedSplits`
  - `tableSchemas`
  - `splitFinishedOffsets`
  - `splitFinishedCheckpointIds`

- `onFinishedSplits()` appends finished split offsets into 
`splitFinishedOffsets` and records checkpoint ids in 
`splitFinishedCheckpointIds`.

- `snapshotState()` always creates `SnapshotPendingSplitsState` with the full 
snapshot state:

  - `alreadyProcessedTables`
  - `remainingSplits`
  - `assignedSplits`
  - `tableSchemas`
  - `splitFinishedOffsets`
  - `splitFinishedCheckpointIds`
  - `chunkSplitterState`

- `HybridSplitAssigner.snapshotState()` wraps this full snapshot state inside 
`HybridPendingSplitsState`, even after the stream split has already been 
assigned.

- `PendingSplitsStateSerializer.serializeHybridPendingSplitsState()` serializes 
the full `SnapshotPendingSplitsState` first, then only writes the 
`isStreamSplitAssigned` flag.

- `notifyCheckpointComplete()` removes entries from 
`splitFinishedCheckpointIds`, but it does not remove the heavyweight metadata 
in `assignedSplits` or `splitFinishedOffsets`.

This means that after the snapshot phase is complete, large snapshot metadata 
can still be retained in the live coordinator object and serialized into later 
checkpoints.

The same pattern also exists in the MySQL-specific legacy path:

- `MySqlSnapshotSplitAssigner`
- `MySqlHybridSplitAssigner`
- `PendingSplitsStateSerializer`

For example, `MySqlHybridSplitAssigner.snapshotState()` still wraps 
`snapshotSplitAssigner.snapshotState(checkpointId)` in 
`HybridPendingSplitsState`, and the MySQL snapshot state serializer writes 
`assignedSplits`, `splitFinishedOffsets`, and `tableSchemas`.
h2. Direct Memory / Container OOM Risk

There is another important factor that makes this issue easier to trigger in 
containerized deployments.

The snapshot split metadata is retained in the JobManager-side source 
coordinator. During checkpointing, this large coordinator state needs to be 
serialized and persisted/transferred. This may create multiple copies of the 
same metadata in a short period of time, including:

- live Java objects in the coordinator, such as `assignedSplits` and 
`splitFinishedOffsets`;
- serialized coordinator state bytes;
- direct/off-heap buffers used by RPC, network, or checkpoint storage clients 
during state transfer.

For JobManager, Flink does not enable the JVM direct memory limit by default. 
The config `jobmanager.memory.enable-jvm-direct-memory-limit` defaults to 
`false`. When it is disabled, Flink does not set `-XX:MaxDirectMemorySize` for 
the JobManager process, so direct memory usage may not fail early with a 
JVM-level `OutOfMemoryError: Direct buffer memory`.

In containerized environments, the container memory limit still applies to the 
whole process RSS. Therefore, a large coordinator state may increase 
direct/off-heap memory during checkpoint serialization or transfer and 
eventually cause the container to be OOMKilled, even if the Java heap itself 
does not obviously exceed its limit.

This makes the retained snapshot split metadata more dangerous: it is not only 
a large heap/coordinator-state problem, but can also amplify JobManager direct 
memory pressure during every checkpoint.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to