[ 
https://issues.apache.org/jira/browse/FLINK-34820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34820:
-----------------------------------
    Labels: github-import pull-request-available  (was: github-import)

> [postgres] Remove unnecessary schema fresh to improve performance.
> ------------------------------------------------------------------
>
>                 Key: FLINK-34820
>                 URL: https://issues.apache.org/jira/browse/FLINK-34820
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>            Reporter: Flink CDC Issue Import
>            Assignee: ouyangwulin
>            Priority: Major
>              Labels: github-import, pull-request-available
>
> ### Search before asking
> - [X] I searched in the 
> [issues|https://github.com/ververica/flink-cdc-connectors/issues] and found 
> nothing similar.
> ### Motivation
> It's very time-consuming for postgres to refresh schema if there are many 
> tables to read. According to our testing, refreshing 400 tables takes 15 
> minutes. Because it takes a long time to refresh the current table's schema 
> when reading a chunk in the scan stage, the data rate shows the following 
> sawtooth pattern. Therefore, we need to minimize unnecessary shema refreshes 
> as much as possible。
> <img width="437" alt="image" 
> src="https://github.com/ververica/flink-cdc-connectors/assets/5181963/ffadeae0-5e8f-46d7-8941-3c1e3a0e7240";>
> ### Solution
> Firstly, the origin schema of postgres cdc is [the schema filed of 
> PostgresSourceFetchTaskContext|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java#L91],
>  which is created and refreshed when 
> [PostgresSourceFetchTaskContext#configure|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java#L176]
>  is called, and both [the schema refresh of scan 
> stage|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java#L281]
>  and [stream 
> stage|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java#L118]
>  are refreshing the schema of PostgresSourceFetchTaskContext. A new 
> PostgresSourceFetchTaskContext is created in 
> IncrementalSourceSplitReader#checkSplitOrStartNext for each split (both 
> [SnapshotSplit|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java#L124]
>  and 
> [StreamSplit|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java#L134)].
>  For snapshot splits, even with the condition of [whether the currentFetcher 
> is equal to 
> null|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java#L129],
>  in many cases it still leads to the re creation of the 
> PostgresSourceFetchTaskContext because the IncrementalSourceSplitReader is 
> often discarded by the Flink kernal when a snapshot chunk is finished and 
> become idle. See
> 1. 
> [SourceReaderBase#pollNext|https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L149]
>  -> 
> [SourceReaderBase#finishedOrAvailableLater|https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L332]
>  -> 
> [SplitFetcherManager#maybeShutdownFinishedFetchers|https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java#L233)
>  -> SplitFetcher#shutdown and SplitFetcher is removed. 
> 2. IncrementalSourceSplitReader(implements SplitReader] is 
> [SplitFetcher#splitReader|https://github.com/apache/flink/blob/c7beda0da81ffc4bbb01befafd2eed08b7b35854/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L57]
>  and removed also.
> Based on the analysis of the above, we get two optimizations.
> 1. It's enough to refresh the schema when 
> [PostgresSourceFetchTaskContext#configure|https://github.com/ververica/flink-cdc-connectors/blob/e0fd6f965b702cc2876372dc068379dafe066277/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java#L176]
>  is called, and there is no need to refresh the schema afterwards.
> 2. Reuse PostgresSourceFetchTaskContext between SnapshotSplits based on 
> sourceConfig, as PostgresSourceFetchTaskContext is created for almost every 
> SnapshotSplit.
> ### Alternatives
> _No response_
> ### Anything else?
> _No response_
> ### Are you willing to submit a PR?
> - [X] I'm willing to submit a PR!
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/apache/flink-cdc/issues/2570
> Created by: [lzshlzsh|https://github.com/lzshlzsh]
> Labels: enhancement, 
> Created at: Sun Oct 22 00:28:09 CST 2023
> State: open



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to