[ https://issues.apache.org/jira/browse/FLINK-38184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hongshun Wang updated FLINK-38184: ---------------------------------- Description: When I have a big Postgres source table(1 billion data), then the checkpoint will cost multiple minutes which will block the whole reading. The main cost is org.apache.flink.core.memory.DataOutputSerializer#getCopyOfBuffer. !image-2025-08-04-11-03-18-074.png! In cdc framework, SplitSerializer will invoke DataOutputSerializer#getCopyOfBuffer for each finished split info {code:java} // org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer#writeFinishedSplitsInfo private void writeFinishedSplitsInfo( List<FinishedSnapshotSplitInfo> finishedSplitsInfo, DataOutputSerializer out) throws IOException { final int size = finishedSplitsInfo.size(); out.writeInt(size); for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) { splitInfo.serialize(out); } } //org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo#serialize(org.apache.flink.core.memory.DataOutputSerializer) public byte[] serialize(final DataOutputSerializer out) throws IOException { out.writeUTF(this.getTableId().toString()); out.writeUTF(this.getSplitId()); out.writeUTF(SerializerUtils.rowToSerializedString(this.getSplitStart())); out.writeUTF(SerializerUtils.rowToSerializedString(this.getSplitEnd())); out.writeUTF(SerializerUtils.rowToSerializedString(this.offsetFactory)); writeOffsetPosition(this.getHighWatermark(), out); boolean useCatalogBeforeSchema = SerializerUtils.shouldUseCatalogBeforeSchema(this.getTableId()); out.writeBoolean(useCatalogBeforeSchema); return out.getCopyOfBuffer(); } {code} However, it's different in mysql cdc {code:java} // org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer#writeFinishedSplitsInfo private static void writeFinishedSplitsInfo( List<FinishedSnapshotSplitInfo> finishedSplitsInfo, DataOutputSerializer out) throws IOException { final int size = finishedSplitsInfo.size(); out.writeInt(size); for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) { out.writeUTF(splitInfo.getTableId().toString()); out.writeUTF(splitInfo.getSplitId()); out.writeUTF(rowToSerializedString(splitInfo.getSplitStart())); out.writeUTF(rowToSerializedString(splitInfo.getSplitEnd())); writeBinlogPosition(splitInfo.getHighWatermark(), out); } } {code} To be honest, it's a redundant operation. For one split, it only need one time of out.getCopyOfBuffer rather than multiple times. was: When I have a big Postgres source table(1 billion data), then the checkpoint will cost multiple minutes which will block the whole reading. The main cost is org.apache.flink.core.memory.DataOutputSerializer#getCopyOfBuffer. !image-2025-08-04-11-03-18-074.png! To be host, it's a redundant operation. For one split, it only need one time of out.getCopyOfBuffer rather than multiple times. ```java private void writeFinishedSplitsInfo( List<FinishedSnapshotSplitInfo> finishedSplitsInfo, DataOutputSerializer out) throws IOException { final int size = finishedSplitsInfo.size(); out.writeInt(size); for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) { splitInfo.serialize(out); } } ``` > CDC no need to getCopyOfBuffer for each split info. > --------------------------------------------------- > > Key: FLINK-38184 > URL: https://issues.apache.org/jira/browse/FLINK-38184 > Project: Flink > Issue Type: Improvement > Components: Flink CDC > Affects Versions: cdc-3.4.0 > Reporter: Hongshun Wang > Priority: Major > Labels: pull-request-available > Fix For: cdc-3.5.0 > > Attachments: image-2025-08-04-11-03-18-074.png > > > When I have a big Postgres source table(1 billion data), then the checkpoint > will cost multiple minutes which will block the whole reading. The main cost > is org.apache.flink.core.memory.DataOutputSerializer#getCopyOfBuffer. > !image-2025-08-04-11-03-18-074.png! > In cdc framework, SplitSerializer will invoke > DataOutputSerializer#getCopyOfBuffer for each finished split info > {code:java} > // > org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer#writeFinishedSplitsInfo > private void writeFinishedSplitsInfo( > List<FinishedSnapshotSplitInfo> finishedSplitsInfo, > DataOutputSerializer out) > throws IOException { > final int size = finishedSplitsInfo.size(); > out.writeInt(size); > for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) { > splitInfo.serialize(out); > } > } > //org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo#serialize(org.apache.flink.core.memory.DataOutputSerializer) > public byte[] serialize(final DataOutputSerializer out) throws IOException { > out.writeUTF(this.getTableId().toString()); > out.writeUTF(this.getSplitId()); > out.writeUTF(SerializerUtils.rowToSerializedString(this.getSplitStart())); > out.writeUTF(SerializerUtils.rowToSerializedString(this.getSplitEnd())); > out.writeUTF(SerializerUtils.rowToSerializedString(this.offsetFactory)); > writeOffsetPosition(this.getHighWatermark(), out); > boolean useCatalogBeforeSchema = > SerializerUtils.shouldUseCatalogBeforeSchema(this.getTableId()); > out.writeBoolean(useCatalogBeforeSchema); > return out.getCopyOfBuffer(); > } {code} > However, it's different in mysql cdc > > {code:java} > // > org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer#writeFinishedSplitsInfo > private static void writeFinishedSplitsInfo( > List<FinishedSnapshotSplitInfo> finishedSplitsInfo, > DataOutputSerializer out) > throws IOException { > final int size = finishedSplitsInfo.size(); > out.writeInt(size); > for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) { > out.writeUTF(splitInfo.getTableId().toString()); > out.writeUTF(splitInfo.getSplitId()); > out.writeUTF(rowToSerializedString(splitInfo.getSplitStart())); > out.writeUTF(rowToSerializedString(splitInfo.getSplitEnd())); > writeBinlogPosition(splitInfo.getHighWatermark(), out); > } > } {code} > > > To be honest, it's a redundant operation. For one split, it only need one > time of out.getCopyOfBuffer rather than multiple times. -- This message was sent by Atlassian Jira (v8.20.10#820010)