[ 
https://issues.apache.org/jira/browse/FLINK-38184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-38184:
----------------------------------
    Priority: Critical  (was: Major)

> CDC no need to getCopyOfBuffer for each split info.
> ---------------------------------------------------
>
>                 Key: FLINK-38184
>                 URL: https://issues.apache.org/jira/browse/FLINK-38184
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.4.0
>            Reporter: Hongshun Wang
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: cdc-3.5.0
>
>         Attachments: image-2025-08-04-11-03-18-074.png
>
>
> When I have a big Postgres source table(1 billion data), then the checkpoint 
> will cost multiple minutes which will block the whole reading. The main cost 
> is org.apache.flink.core.memory.DataOutputSerializer#getCopyOfBuffer.
> !image-2025-08-04-11-03-18-074.png!
> In cdc framework, SplitSerializer will invoke 
> DataOutputSerializer#getCopyOfBuffer for each finished split info
> {code:java}
> // 
> org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer#writeFinishedSplitsInfo
> private void writeFinishedSplitsInfo(
>         List<FinishedSnapshotSplitInfo> finishedSplitsInfo, 
> DataOutputSerializer out)
>         throws IOException {
>     final int size = finishedSplitsInfo.size();
>     out.writeInt(size);
>     for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) {
>         splitInfo.serialize(out);
>     }
> }
> //org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo#serialize(org.apache.flink.core.memory.DataOutputSerializer)
> public byte[] serialize(final DataOutputSerializer out) throws IOException {
>     out.writeUTF(this.getTableId().toString());
>     out.writeUTF(this.getSplitId());
>     out.writeUTF(SerializerUtils.rowToSerializedString(this.getSplitStart()));
>     out.writeUTF(SerializerUtils.rowToSerializedString(this.getSplitEnd()));
>     out.writeUTF(SerializerUtils.rowToSerializedString(this.offsetFactory));
>     writeOffsetPosition(this.getHighWatermark(), out);
>     boolean useCatalogBeforeSchema =
>             SerializerUtils.shouldUseCatalogBeforeSchema(this.getTableId());
>     out.writeBoolean(useCatalogBeforeSchema);
>     return out.getCopyOfBuffer();
> } {code}
> However, it's different in mysql cdc
>  
> {code:java}
> // 
> org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer#writeFinishedSplitsInfo
> private static void writeFinishedSplitsInfo(
>         List<FinishedSnapshotSplitInfo> finishedSplitsInfo, 
> DataOutputSerializer out)
>         throws IOException {
>     final int size = finishedSplitsInfo.size();
>     out.writeInt(size);
>     for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) {
>         out.writeUTF(splitInfo.getTableId().toString());
>         out.writeUTF(splitInfo.getSplitId());
>         out.writeUTF(rowToSerializedString(splitInfo.getSplitStart()));
>         out.writeUTF(rowToSerializedString(splitInfo.getSplitEnd()));
>         writeBinlogPosition(splitInfo.getHighWatermark(), out);
>     }
> } {code}
>  
>  
> To be honest, it's a redundant operation. For one split, it only need  one 
> time of out.getCopyOfBuffer rather than multiple times.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to