[ https://issues.apache.org/jira/browse/FLINK-38184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hongshun Wang updated FLINK-38184: ---------------------------------- Priority: Critical (was: Major) > CDC no need to getCopyOfBuffer for each split info. > --------------------------------------------------- > > Key: FLINK-38184 > URL: https://issues.apache.org/jira/browse/FLINK-38184 > Project: Flink > Issue Type: Improvement > Components: Flink CDC > Affects Versions: cdc-3.4.0 > Reporter: Hongshun Wang > Priority: Critical > Labels: pull-request-available > Fix For: cdc-3.5.0 > > Attachments: image-2025-08-04-11-03-18-074.png > > > When I have a big Postgres source table(1 billion data), then the checkpoint > will cost multiple minutes which will block the whole reading. The main cost > is org.apache.flink.core.memory.DataOutputSerializer#getCopyOfBuffer. > !image-2025-08-04-11-03-18-074.png! > In cdc framework, SplitSerializer will invoke > DataOutputSerializer#getCopyOfBuffer for each finished split info > {code:java} > // > org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer#writeFinishedSplitsInfo > private void writeFinishedSplitsInfo( > List<FinishedSnapshotSplitInfo> finishedSplitsInfo, > DataOutputSerializer out) > throws IOException { > final int size = finishedSplitsInfo.size(); > out.writeInt(size); > for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) { > splitInfo.serialize(out); > } > } > //org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo#serialize(org.apache.flink.core.memory.DataOutputSerializer) > public byte[] serialize(final DataOutputSerializer out) throws IOException { > out.writeUTF(this.getTableId().toString()); > out.writeUTF(this.getSplitId()); > out.writeUTF(SerializerUtils.rowToSerializedString(this.getSplitStart())); > out.writeUTF(SerializerUtils.rowToSerializedString(this.getSplitEnd())); > out.writeUTF(SerializerUtils.rowToSerializedString(this.offsetFactory)); > writeOffsetPosition(this.getHighWatermark(), out); > boolean useCatalogBeforeSchema = > SerializerUtils.shouldUseCatalogBeforeSchema(this.getTableId()); > out.writeBoolean(useCatalogBeforeSchema); > return out.getCopyOfBuffer(); > } {code} > However, it's different in mysql cdc > > {code:java} > // > org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer#writeFinishedSplitsInfo > private static void writeFinishedSplitsInfo( > List<FinishedSnapshotSplitInfo> finishedSplitsInfo, > DataOutputSerializer out) > throws IOException { > final int size = finishedSplitsInfo.size(); > out.writeInt(size); > for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) { > out.writeUTF(splitInfo.getTableId().toString()); > out.writeUTF(splitInfo.getSplitId()); > out.writeUTF(rowToSerializedString(splitInfo.getSplitStart())); > out.writeUTF(rowToSerializedString(splitInfo.getSplitEnd())); > writeBinlogPosition(splitInfo.getHighWatermark(), out); > } > } {code} > > > To be honest, it's a redundant operation. For one split, it only need one > time of out.getCopyOfBuffer rather than multiple times. -- This message was sent by Atlassian Jira (v8.20.10#820010)