[ 
https://issues.apache.org/jira/browse/FLINK-38184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-38184:
----------------------------------
    Description: 
When I have a big Postgres source table(1 billion data), then the checkpoint 
will cost multiple minutes which will block the whole reading. The main cost is 
org.apache.flink.core.memory.DataOutputSerializer#getCopyOfBuffer.

!image-2025-08-04-11-03-18-074.png!

In cdc framework, SplitSerializer will invoke 
DataOutputSerializer#getCopyOfBuffer for each finished split info
{code:java}
// 
org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer#writeFinishedSplitsInfo
private void writeFinishedSplitsInfo(
        List<FinishedSnapshotSplitInfo> finishedSplitsInfo, 
DataOutputSerializer out)
        throws IOException {
    final int size = finishedSplitsInfo.size();
    out.writeInt(size);
    for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) {
        splitInfo.serialize(out);
    }
}

//org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo#serialize(org.apache.flink.core.memory.DataOutputSerializer)
public byte[] serialize(final DataOutputSerializer out) throws IOException {
    out.writeUTF(this.getTableId().toString());
    out.writeUTF(this.getSplitId());
    out.writeUTF(SerializerUtils.rowToSerializedString(this.getSplitStart()));
    out.writeUTF(SerializerUtils.rowToSerializedString(this.getSplitEnd()));
    out.writeUTF(SerializerUtils.rowToSerializedString(this.offsetFactory));
    writeOffsetPosition(this.getHighWatermark(), out);
    boolean useCatalogBeforeSchema =
            SerializerUtils.shouldUseCatalogBeforeSchema(this.getTableId());
    out.writeBoolean(useCatalogBeforeSchema);
    return out.getCopyOfBuffer();
} {code}
However, it's different in mysql cdc

 
{code:java}
// 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer#writeFinishedSplitsInfo
private static void writeFinishedSplitsInfo(
        List<FinishedSnapshotSplitInfo> finishedSplitsInfo, 
DataOutputSerializer out)
        throws IOException {
    final int size = finishedSplitsInfo.size();
    out.writeInt(size);
    for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) {
        out.writeUTF(splitInfo.getTableId().toString());
        out.writeUTF(splitInfo.getSplitId());
        out.writeUTF(rowToSerializedString(splitInfo.getSplitStart()));
        out.writeUTF(rowToSerializedString(splitInfo.getSplitEnd()));
        writeBinlogPosition(splitInfo.getHighWatermark(), out);
    }
} {code}
 

 

To be honest, it's a redundant operation. For one split, it only need  one time 
of out.getCopyOfBuffer rather than multiple times.

  was:
When I have a big Postgres source table(1 billion data), then the checkpoint 
will cost multiple minutes which will block the whole reading. The main cost is 
org.apache.flink.core.memory.DataOutputSerializer#getCopyOfBuffer.

!image-2025-08-04-11-03-18-074.png!

To be host, it's a redundant operation. For one split, it only need  
one time of out.getCopyOfBuffer rather than multiple times.

```java
private void writeFinishedSplitsInfo(
List<FinishedSnapshotSplitInfo> finishedSplitsInfo, DataOutputSerializer out)
throws IOException {
final int size = finishedSplitsInfo.size();
out.writeInt(size);
for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) {
splitInfo.serialize(out);
}
}
```


> CDC no need to getCopyOfBuffer for each split info.
> ---------------------------------------------------
>
>                 Key: FLINK-38184
>                 URL: https://issues.apache.org/jira/browse/FLINK-38184
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.4.0
>            Reporter: Hongshun Wang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: cdc-3.5.0
>
>         Attachments: image-2025-08-04-11-03-18-074.png
>
>
> When I have a big Postgres source table(1 billion data), then the checkpoint 
> will cost multiple minutes which will block the whole reading. The main cost 
> is org.apache.flink.core.memory.DataOutputSerializer#getCopyOfBuffer.
> !image-2025-08-04-11-03-18-074.png!
> In cdc framework, SplitSerializer will invoke 
> DataOutputSerializer#getCopyOfBuffer for each finished split info
> {code:java}
> // 
> org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer#writeFinishedSplitsInfo
> private void writeFinishedSplitsInfo(
>         List<FinishedSnapshotSplitInfo> finishedSplitsInfo, 
> DataOutputSerializer out)
>         throws IOException {
>     final int size = finishedSplitsInfo.size();
>     out.writeInt(size);
>     for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) {
>         splitInfo.serialize(out);
>     }
> }
> //org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo#serialize(org.apache.flink.core.memory.DataOutputSerializer)
> public byte[] serialize(final DataOutputSerializer out) throws IOException {
>     out.writeUTF(this.getTableId().toString());
>     out.writeUTF(this.getSplitId());
>     out.writeUTF(SerializerUtils.rowToSerializedString(this.getSplitStart()));
>     out.writeUTF(SerializerUtils.rowToSerializedString(this.getSplitEnd()));
>     out.writeUTF(SerializerUtils.rowToSerializedString(this.offsetFactory));
>     writeOffsetPosition(this.getHighWatermark(), out);
>     boolean useCatalogBeforeSchema =
>             SerializerUtils.shouldUseCatalogBeforeSchema(this.getTableId());
>     out.writeBoolean(useCatalogBeforeSchema);
>     return out.getCopyOfBuffer();
> } {code}
> However, it's different in mysql cdc
>  
> {code:java}
> // 
> org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer#writeFinishedSplitsInfo
> private static void writeFinishedSplitsInfo(
>         List<FinishedSnapshotSplitInfo> finishedSplitsInfo, 
> DataOutputSerializer out)
>         throws IOException {
>     final int size = finishedSplitsInfo.size();
>     out.writeInt(size);
>     for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) {
>         out.writeUTF(splitInfo.getTableId().toString());
>         out.writeUTF(splitInfo.getSplitId());
>         out.writeUTF(rowToSerializedString(splitInfo.getSplitStart()));
>         out.writeUTF(rowToSerializedString(splitInfo.getSplitEnd()));
>         writeBinlogPosition(splitInfo.getHighWatermark(), out);
>     }
> } {code}
>  
>  
> To be honest, it's a redundant operation. For one split, it only need  one 
> time of out.getCopyOfBuffer rather than multiple times.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to