[ 
https://issues.apache.org/jira/browse/FLINK-39775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-39775:
-------------------------------
    Description: 
h2. Description

We observed a JobManager OOM issue when running Flink CDC incremental snapshot 
with a large number of snapshot splits.

The job can finish the snapshot phase and switch to the stream/binlog phase 
successfully. However, the snapshot split metadata appears to remain in the 
source coordinator state after the stream split has been assigned. In 
large-table scenarios, subsequent checkpoints continue to serialize the full 
snapshot split state, which makes the JobManager memory usage stay high and 
eventually causes OOM.

Environment
 - Flink CDC version: 3.5.0
 - Flink version: 1.20.x
 - Connector: MySQL CDC / incremental snapshot
 - Sink: Paimon
 - Startup mode: initial
 - `scan.incremental.snapshot.chunk.size`: default `8096`
 - Table scale: one table has about 2.5 billion rows, and the whole job 
contains many tables

With the default chunk size, one 2.5B-row table can generate roughly 300K 
snapshot splits. When multiple large tables are included in one job, the number 
of finished snapshot splits becomes very large.
h2. Actual Behavior

After all snapshot splits are finished and the stream/binlog split is assigned:

1. JobManager memory usage remains high.
2. Source coordinator state remains very large.
3. Checkpoints continue to carry large snapshot split metadata.
4. The job may eventually fail with JobManager OOM.
h2. Expected Behavior

Snapshot split metadata should only be retained while it is still required for 
correctness, for example before the stream split is created, transferred to 
readers, and covered by a completed checkpoint.

After the source has safely entered the stream phase, the coordinator should 
not continue to retain and checkpoint all heavyweight snapshot split maps such 
as assigned snapshot splits and finished split offsets.
 # 
 ## Code Analysis

The issue seems related to the lifecycle of snapshot metadata in the split 
assigner.

In the generic source implementation:
 - `SnapshotSplitAssigner` keeps the following structures in memory:

  - `assignedSplits`
  - `tableSchemas`
  - `splitFinishedOffsets`
  - `splitFinishedCheckpointIds`
 - `onFinishedSplits()` appends finished split offsets into 
`splitFinishedOffsets` and records checkpoint ids in 
`splitFinishedCheckpointIds`.

 - `snapshotState()` always creates `SnapshotPendingSplitsState` with the full 
snapshot state:

  - `alreadyProcessedTables`
  - `remainingSplits`
  - `assignedSplits`
  - `tableSchemas`
  - `splitFinishedOffsets`
  - `splitFinishedCheckpointIds`
  - `chunkSplitterState`
 - `HybridSplitAssigner.snapshotState()` wraps this full snapshot state inside 
`HybridPendingSplitsState`, even after the stream split has already been 
assigned.

 - `PendingSplitsStateSerializer.serializeHybridPendingSplitsState()` 
serializes the full `SnapshotPendingSplitsState` first, then only writes the 
`isStreamSplitAssigned` flag.

 - `notifyCheckpointComplete()` removes entries from 
`splitFinishedCheckpointIds`, but it does not remove the heavyweight metadata 
in `assignedSplits` or `splitFinishedOffsets`.

This means that after the snapshot phase is complete, large snapshot metadata 
can still be retained in the live coordinator object and serialized into later 
checkpoints.

The same pattern also exists in the MySQL-specific legacy path:
 - `MySqlSnapshotSplitAssigner`
 - `MySqlHybridSplitAssigner`
 - `PendingSplitsStateSerializer`

For example, `MySqlHybridSplitAssigner.snapshotState()` still wraps 
`snapshotSplitAssigner.snapshotState(checkpointId)` in 
`HybridPendingSplitsState`, and the MySQL snapshot state serializer writes 
`assignedSplits`, `splitFinishedOffsets`, and `tableSchemas`.
h2. Direct Memory / Container OOM Risk

There is another important factor that makes this issue easier to trigger in 
containerized deployments.

The snapshot split metadata is retained in the JobManager-side source 
coordinator. During checkpointing, this large coordinator state needs to be 
serialized and persisted/transferred. This may create multiple copies of the 
same metadata in a short period of time, including:
 - live Java objects in the coordinator, such as `assignedSplits` and 
`splitFinishedOffsets`;
 - serialized coordinator state bytes;
 - direct/off-heap buffers used by RPC, network, or checkpoint storage clients 
during state transfer.

For JobManager, Flink does not enable the JVM direct memory limit by default. 
The config `jobmanager.memory.enable-jvm-direct-memory-limit` defaults to 
`false`. When it is disabled, Flink does not set `-XX:MaxDirectMemorySize` for 
the JobManager process, so direct memory usage may not fail early with a 
JVM-level `OutOfMemoryError: Direct buffer memory`.

In containerized environments, the container memory limit still applies to the 
whole process RSS. Therefore, a large coordinator state may increase 
direct/off-heap memory during checkpoint serialization or transfer and 
eventually cause the container to be OOMKilled, even if the Java heap itself 
does not obviously exceed its limit.

This makes the retained snapshot split metadata more dangerous: it is not only 
a large heap/coordinator-state problem, but can also amplify JobManager direct 
memory pressure during every checkpoint.

 

  was:
h2. Description

We observed a JobManager OOM issue when running Flink CDC incremental snapshot 
with a large number of snapshot splits.

The job can finish the snapshot phase and switch to the stream/binlog phase 
successfully. However, the snapshot split metadata appears to remain in the 
source coordinator state after the stream split has been assigned. In 
large-table scenarios, subsequent checkpoints continue to serialize the full 
snapshot split state, which makes the JobManager memory usage stay high and 
eventually causes OOM.

## Environment

- Flink CDC version: 3.5.0
- Flink version: 1.20.x
- Connector: MySQL CDC / incremental snapshot
- Sink: Paimon
- Startup mode: initial
- `scan.incremental.snapshot.chunk.size`: default `8096`
- Table scale: one table has about 2.5 billion rows, and the whole job contains 
many tables

With the default chunk size, one 2.5B-row table can generate roughly 300K 
snapshot splits. When multiple large tables are included in one job, the number 
of finished snapshot splits becomes very large.
h2. Actual Behavior

After all snapshot splits are finished and the stream/binlog split is assigned:

1. JobManager memory usage remains high.
2. Source coordinator state remains very large.
3. Checkpoints continue to carry large snapshot split metadata.
4. The job may eventually fail with JobManager OOM.
h2. Expected Behavior

Snapshot split metadata should only be retained while it is still required for 
correctness, for example before the stream split is created, transferred to 
readers, and covered by a completed checkpoint.

After the source has safely entered the stream phase, the coordinator should 
not continue to retain and checkpoint all heavyweight snapshot split maps such 
as assigned snapshot splits and finished split offsets.

## Code Analysis

The issue seems related to the lifecycle of snapshot metadata in the split 
assigner.

In the generic source implementation:

- `SnapshotSplitAssigner` keeps the following structures in memory:

  - `assignedSplits`
  - `tableSchemas`
  - `splitFinishedOffsets`
  - `splitFinishedCheckpointIds`

- `onFinishedSplits()` appends finished split offsets into 
`splitFinishedOffsets` and records checkpoint ids in 
`splitFinishedCheckpointIds`.

- `snapshotState()` always creates `SnapshotPendingSplitsState` with the full 
snapshot state:

  - `alreadyProcessedTables`
  - `remainingSplits`
  - `assignedSplits`
  - `tableSchemas`
  - `splitFinishedOffsets`
  - `splitFinishedCheckpointIds`
  - `chunkSplitterState`

- `HybridSplitAssigner.snapshotState()` wraps this full snapshot state inside 
`HybridPendingSplitsState`, even after the stream split has already been 
assigned.

- `PendingSplitsStateSerializer.serializeHybridPendingSplitsState()` serializes 
the full `SnapshotPendingSplitsState` first, then only writes the 
`isStreamSplitAssigned` flag.

- `notifyCheckpointComplete()` removes entries from 
`splitFinishedCheckpointIds`, but it does not remove the heavyweight metadata 
in `assignedSplits` or `splitFinishedOffsets`.

This means that after the snapshot phase is complete, large snapshot metadata 
can still be retained in the live coordinator object and serialized into later 
checkpoints.

The same pattern also exists in the MySQL-specific legacy path:

- `MySqlSnapshotSplitAssigner`
- `MySqlHybridSplitAssigner`
- `PendingSplitsStateSerializer`

For example, `MySqlHybridSplitAssigner.snapshotState()` still wraps 
`snapshotSplitAssigner.snapshotState(checkpointId)` in 
`HybridPendingSplitsState`, and the MySQL snapshot state serializer writes 
`assignedSplits`, `splitFinishedOffsets`, and `tableSchemas`.
h2. Direct Memory / Container OOM Risk

There is another important factor that makes this issue easier to trigger in 
containerized deployments.

The snapshot split metadata is retained in the JobManager-side source 
coordinator. During checkpointing, this large coordinator state needs to be 
serialized and persisted/transferred. This may create multiple copies of the 
same metadata in a short period of time, including:

- live Java objects in the coordinator, such as `assignedSplits` and 
`splitFinishedOffsets`;
- serialized coordinator state bytes;
- direct/off-heap buffers used by RPC, network, or checkpoint storage clients 
during state transfer.

For JobManager, Flink does not enable the JVM direct memory limit by default. 
The config `jobmanager.memory.enable-jvm-direct-memory-limit` defaults to 
`false`. When it is disabled, Flink does not set `-XX:MaxDirectMemorySize` for 
the JobManager process, so direct memory usage may not fail early with a 
JVM-level `OutOfMemoryError: Direct buffer memory`.

In containerized environments, the container memory limit still applies to the 
whole process RSS. Therefore, a large coordinator state may increase 
direct/off-heap memory during checkpoint serialization or transfer and 
eventually cause the container to be OOMKilled, even if the Java heap itself 
does not obviously exceed its limit.

This makes the retained snapshot split metadata more dangerous: it is not only 
a large heap/coordinator-state problem, but can also amplify JobManager direct 
memory pressure during every checkpoint.

 


> Snapshot split metadata is not released after switching to stream phase, 
> causing large JobManager coordinator state and OOM with many snapshot splits
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39775
>                 URL: https://issues.apache.org/jira/browse/FLINK-39775
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: yuanfenghu
>            Priority: Major
>
> h2. Description
> We observed a JobManager OOM issue when running Flink CDC incremental 
> snapshot with a large number of snapshot splits.
> The job can finish the snapshot phase and switch to the stream/binlog phase 
> successfully. However, the snapshot split metadata appears to remain in the 
> source coordinator state after the stream split has been assigned. In 
> large-table scenarios, subsequent checkpoints continue to serialize the full 
> snapshot split state, which makes the JobManager memory usage stay high and 
> eventually causes OOM.
> Environment
>  - Flink CDC version: 3.5.0
>  - Flink version: 1.20.x
>  - Connector: MySQL CDC / incremental snapshot
>  - Sink: Paimon
>  - Startup mode: initial
>  - `scan.incremental.snapshot.chunk.size`: default `8096`
>  - Table scale: one table has about 2.5 billion rows, and the whole job 
> contains many tables
> With the default chunk size, one 2.5B-row table can generate roughly 300K 
> snapshot splits. When multiple large tables are included in one job, the 
> number of finished snapshot splits becomes very large.
> h2. Actual Behavior
> After all snapshot splits are finished and the stream/binlog split is 
> assigned:
> 1. JobManager memory usage remains high.
> 2. Source coordinator state remains very large.
> 3. Checkpoints continue to carry large snapshot split metadata.
> 4. The job may eventually fail with JobManager OOM.
> h2. Expected Behavior
> Snapshot split metadata should only be retained while it is still required 
> for correctness, for example before the stream split is created, transferred 
> to readers, and covered by a completed checkpoint.
> After the source has safely entered the stream phase, the coordinator should 
> not continue to retain and checkpoint all heavyweight snapshot split maps 
> such as assigned snapshot splits and finished split offsets.
>  # 
>  ## Code Analysis
> The issue seems related to the lifecycle of snapshot metadata in the split 
> assigner.
> In the generic source implementation:
>  - `SnapshotSplitAssigner` keeps the following structures in memory:
>   - `assignedSplits`
>   - `tableSchemas`
>   - `splitFinishedOffsets`
>   - `splitFinishedCheckpointIds`
>  - `onFinishedSplits()` appends finished split offsets into 
> `splitFinishedOffsets` and records checkpoint ids in 
> `splitFinishedCheckpointIds`.
>  - `snapshotState()` always creates `SnapshotPendingSplitsState` with the 
> full snapshot state:
>   - `alreadyProcessedTables`
>   - `remainingSplits`
>   - `assignedSplits`
>   - `tableSchemas`
>   - `splitFinishedOffsets`
>   - `splitFinishedCheckpointIds`
>   - `chunkSplitterState`
>  - `HybridSplitAssigner.snapshotState()` wraps this full snapshot state 
> inside `HybridPendingSplitsState`, even after the stream split has already 
> been assigned.
>  - `PendingSplitsStateSerializer.serializeHybridPendingSplitsState()` 
> serializes the full `SnapshotPendingSplitsState` first, then only writes the 
> `isStreamSplitAssigned` flag.
>  - `notifyCheckpointComplete()` removes entries from 
> `splitFinishedCheckpointIds`, but it does not remove the heavyweight metadata 
> in `assignedSplits` or `splitFinishedOffsets`.
> This means that after the snapshot phase is complete, large snapshot metadata 
> can still be retained in the live coordinator object and serialized into 
> later checkpoints.
> The same pattern also exists in the MySQL-specific legacy path:
>  - `MySqlSnapshotSplitAssigner`
>  - `MySqlHybridSplitAssigner`
>  - `PendingSplitsStateSerializer`
> For example, `MySqlHybridSplitAssigner.snapshotState()` still wraps 
> `snapshotSplitAssigner.snapshotState(checkpointId)` in 
> `HybridPendingSplitsState`, and the MySQL snapshot state serializer writes 
> `assignedSplits`, `splitFinishedOffsets`, and `tableSchemas`.
> h2. Direct Memory / Container OOM Risk
> There is another important factor that makes this issue easier to trigger in 
> containerized deployments.
> The snapshot split metadata is retained in the JobManager-side source 
> coordinator. During checkpointing, this large coordinator state needs to be 
> serialized and persisted/transferred. This may create multiple copies of the 
> same metadata in a short period of time, including:
>  - live Java objects in the coordinator, such as `assignedSplits` and 
> `splitFinishedOffsets`;
>  - serialized coordinator state bytes;
>  - direct/off-heap buffers used by RPC, network, or checkpoint storage 
> clients during state transfer.
> For JobManager, Flink does not enable the JVM direct memory limit by default. 
> The config `jobmanager.memory.enable-jvm-direct-memory-limit` defaults to 
> `false`. When it is disabled, Flink does not set `-XX:MaxDirectMemorySize` 
> for the JobManager process, so direct memory usage may not fail early with a 
> JVM-level `OutOfMemoryError: Direct buffer memory`.
> In containerized environments, the container memory limit still applies to 
> the whole process RSS. Therefore, a large coordinator state may increase 
> direct/off-heap memory during checkpoint serialization or transfer and 
> eventually cause the container to be OOMKilled, even if the Java heap itself 
> does not obviously exceed its limit.
> This makes the retained snapshot split metadata more dangerous: it is not 
> only a large heap/coordinator-state problem, but can also amplify JobManager 
> direct memory pressure during every checkpoint.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to