lzshlzsh opened a new pull request, #2571:
URL: https://github.com/apache/flink-cdc/pull/2571

   PR of #2570 
   
   ### Motivation
   It's very time-consuming for postgres to refresh schema if there are many 
tables to read. According to our testing, refreshing 400 tables takes 15 
minutes. Because it takes a long time to refresh the current table's schema 
when reading a chunk in the scan stage, the data rate shows the following 
sawtooth pattern. Therefore, we need to minimize unnecessary shema refreshes as 
much as possible。
   
   
![image](https://github.com/ververica/flink-cdc-connectors/assets/5181963/8208864a-6857-4459-94ae-034acf9be52f)
   
   ### Solution
   Firstly, the origin schema of postgres cdc is [the schema filed of 
PostgresSourceFetchTaskContext](https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java#L91),
 which is created and refreshed when 
[PostgresSourceFetchTaskContext#configure](https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java#L176)
 is called, and both [the schema refresh of scan 
stage](https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java#L281)
 and [stream stage](https://github.com/ververica/flink-cdc-connectors/blob/e
 
0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java#L118)
 are refreshing the schema of PostgresSourceFetchTaskContext. A new 
PostgresSourceFetchTaskContext is created in 
IncrementalSourceSplitReader#checkSplitOrStartNext for each split (both 
[SnapshotSplit](https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java#L124)
 and 
[StreamSplit](https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java#L134)).
 For snapshot splits, even with the condition of [whether the currentFetcher is 
equal to 
null](https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277
 
/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java#L129),
 in many cases it still leads to the re creation of the 
PostgresSourceFetchTaskContext because the IncrementalSourceSplitReader is 
often discarded by the Flink kernal when a snapshot chunk is finished and 
become idle. See
   1. 
[SourceReaderBase#pollNext](https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L149)
 -> 
[SourceReaderBase#finishedOrAvailableLater](https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L332)
 -> 
[SplitFetcherManager#maybeShutdownFinishedFetchers](https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java#L233)
 -> SplitFetcher#shutdown and SplitFetcher is removed.
   2. IncrementalSourceSplitReader(implements SplitReader) is 
[SplitFetcher#splitReader](https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L57)
 and removed also.
   
   Based on the analysis of the above, we get two optimizations.
   1. It's enough to refresh the schema when 
[PostgresSourceFetchTaskContext#configure](https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java#L176)
 is called, and there is no need to refresh the schema afterwards.
   2. Reuse PostgresSourceFetchTaskContext between SnapshotSplits based on 
sourceConfig, as PostgresSourceFetchTaskContext is created for almost every 
SnapshotSplit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to