[ https://issues.apache.org/jira/browse/FLINK-34820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-34820: ----------------------------------- Labels: github-import pull-request-available (was: github-import) > [postgres] Remove unnecessary schema fresh to improve performance. > ------------------------------------------------------------------ > > Key: FLINK-34820 > URL: https://issues.apache.org/jira/browse/FLINK-34820 > Project: Flink > Issue Type: Improvement > Components: Flink CDC > Reporter: Flink CDC Issue Import > Assignee: ouyangwulin > Priority: Major > Labels: github-import, pull-request-available > > ### Search before asking > - [X] I searched in the > [issues|https://github.com/ververica/flink-cdc-connectors/issues] and found > nothing similar. > ### Motivation > It's very time-consuming for postgres to refresh schema if there are many > tables to read. According to our testing, refreshing 400 tables takes 15 > minutes. Because it takes a long time to refresh the current table's schema > when reading a chunk in the scan stage, the data rate shows the following > sawtooth pattern. Therefore, we need to minimize unnecessary shema refreshes > as much as possible。 > <img width="437" alt="image" > src="https://github.com/ververica/flink-cdc-connectors/assets/5181963/ffadeae0-5e8f-46d7-8941-3c1e3a0e7240"> > ### Solution > Firstly, the origin schema of postgres cdc is [the schema filed of > PostgresSourceFetchTaskContext|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java#L91], > which is created and refreshed when > [PostgresSourceFetchTaskContext#configure|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java#L176] > is called, and both [the schema refresh of scan > stage|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java#L281] > and [stream > stage|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java#L118] > are refreshing the schema of PostgresSourceFetchTaskContext. A new > PostgresSourceFetchTaskContext is created in > IncrementalSourceSplitReader#checkSplitOrStartNext for each split (both > [SnapshotSplit|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java#L124] > and > [StreamSplit|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java#L134)]. > For snapshot splits, even with the condition of [whether the currentFetcher > is equal to > null|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java#L129], > in many cases it still leads to the re creation of the > PostgresSourceFetchTaskContext because the IncrementalSourceSplitReader is > often discarded by the Flink kernal when a snapshot chunk is finished and > become idle. See > 1. > [SourceReaderBase#pollNext|https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L149] > -> > [SourceReaderBase#finishedOrAvailableLater|https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L332] > -> > [SplitFetcherManager#maybeShutdownFinishedFetchers|https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java#L233) > -> SplitFetcher#shutdown and SplitFetcher is removed. > 2. IncrementalSourceSplitReader(implements SplitReader] is > [SplitFetcher#splitReader|https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L57] > and removed also. > Based on the analysis of the above, we get two optimizations. > 1. It's enough to refresh the schema when > [PostgresSourceFetchTaskContext#configure|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java#L176] > is called, and there is no need to refresh the schema afterwards. > 2. Reuse PostgresSourceFetchTaskContext between SnapshotSplits based on > sourceConfig, as PostgresSourceFetchTaskContext is created for almost every > SnapshotSplit. > ### Alternatives > _No response_ > ### Anything else? > _No response_ > ### Are you willing to submit a PR? > - [X] I'm willing to submit a PR! > ---------------- Imported from GitHub ---------------- > Url: https://github.com/apache/flink-cdc/issues/2570 > Created by: [lzshlzsh|https://github.com/lzshlzsh] > Labels: enhancement, > Created at: Sun Oct 22 00:28:09 CST 2023 > State: open -- This message was sent by Atlassian Jira (v8.20.10#820010)