lzshlzsh opened a new pull request, #2571: URL: https://github.com/apache/flink-cdc/pull/2571
PR of #2570 ### Motivation It's very time-consuming for postgres to refresh schema if there are many tables to read. According to our testing, refreshing 400 tables takes 15 minutes. Because it takes a long time to refresh the current table's schema when reading a chunk in the scan stage, the data rate shows the following sawtooth pattern. Therefore, we need to minimize unnecessary shema refreshes as much as possible。  ### Solution Firstly, the origin schema of postgres cdc is [the schema filed of PostgresSourceFetchTaskContext](https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java#L91), which is created and refreshed when [PostgresSourceFetchTaskContext#configure](https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java#L176) is called, and both [the schema refresh of scan stage](https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java#L281) and [stream stage](https://github.com/ververica/flink-cdc-connectors/blob/e 0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java#L118) are refreshing the schema of PostgresSourceFetchTaskContext. A new PostgresSourceFetchTaskContext is created in IncrementalSourceSplitReader#checkSplitOrStartNext for each split (both [SnapshotSplit](https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java#L124) and [StreamSplit](https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java#L134)). For snapshot splits, even with the condition of [whether the currentFetcher is equal to null](https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277 /flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java#L129), in many cases it still leads to the re creation of the PostgresSourceFetchTaskContext because the IncrementalSourceSplitReader is often discarded by the Flink kernal when a snapshot chunk is finished and become idle. See 1. [SourceReaderBase#pollNext](https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L149) -> [SourceReaderBase#finishedOrAvailableLater](https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L332) -> [SplitFetcherManager#maybeShutdownFinishedFetchers](https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java#L233) -> SplitFetcher#shutdown and SplitFetcher is removed. 2. IncrementalSourceSplitReader(implements SplitReader) is [SplitFetcher#splitReader](https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L57) and removed also. Based on the analysis of the above, we get two optimizations. 1. It's enough to refresh the schema when [PostgresSourceFetchTaskContext#configure](https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java#L176) is called, and there is no need to refresh the schema afterwards. 2. Reuse PostgresSourceFetchTaskContext between SnapshotSplits based on sourceConfig, as PostgresSourceFetchTaskContext is created for almost every SnapshotSplit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org