[
https://issues.apache.org/jira/browse/FLINK-38881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
tchivs updated FLINK-38881:
---------------------------
Description:
h1. PostgreSQL Partition Table Routing Support
h2. Background
When using Flink CDC to synchronize PostgreSQL partition tables, we encountered
severe performance issues and functional defects.
h2. Problems Encountered
h3. Problem 1: Frequent Schema Refresh
*Symptom*: Schema was refreshed on every table access, causing massive
unnecessary database queries.
*Root Cause*: {{PostgresSchema.tableFor()}} method calls {{refresh()}} when
table doesn't exist, and {{refresh()}} reloads schema for ALL tables.
*Impact*:
* High database connection pressure
* High CPU and memory consumption
* Increased synchronization latency
h3. Problem 2: Full Schema Load for Single Table Query
*Symptom*: Even when requesting schema for a single table, all tables matching
{{table.include.list}} were loaded.
*Root Cause*: {{PostgresConnection.readSchema()}} uses {{TableFilter}} for
filtering, but the underlying SQL query returns all tables, then filters in
memory.
*Impact*:
* With hundreds of partition tables, each query processes massive data
* Memory usage increases dramatically
h3. Problem 3: Partition Parent Table Missing Primary Key (PostgreSQL 10)
*Symptom*: PostgreSQL 10 partition parent tables have no primary key
definition, forcing users to use regex patterns to match child partitions
individually.
*Root Cause*: PostgreSQL 10 design defines primary keys on child partitions
only, parent table itself cannot have PK constraints. Users cannot directly
monitor the parent table and must use regex to match all child partitions.
*Note*: PostgreSQL 11+ supports {{partition.discovery.enabled}} option which
can automatically discover partitions. But for PostgreSQL 10, this limitation
leads to Problem 4 below.
h3. Problem 4: CreateTableEvent Storm (Caused by Problem 3)
*Symptom*: When using regex to match child partitions, each partition generates
an independent {{CreateTableEvent}}, flooding downstream systems with table
creation events.
*Root Cause*: Due to Problem 3, users must use regex patterns like
{{public\.orders_\d+}} to match all child partitions. Flink CDC treats each
matched child partition as an independent table.
*Impact*:
# *Downstream System Overload*:
#* Each child partition generates its own {{CreateTableEvent}}
#* Hundreds of partitions = hundreds of schema change events
#* Downstream systems overwhelmed with table creation events
#* Data scattered across multiple tables, cannot be queried uniformly
# *Massive Schema Loading*:
#* Each single table query triggers full schema load for ALL tables
#* Initialization takes extremely long time fetching schema for every child
#* High pressure on PostgreSQL database
# *Checkpoint Timeout Risk*:
#* Schema loading time too long
#* Database connections may timeout
#* Easily triggers Flink checkpoint timeout failures
# *Excessive Resource Consumption*:
#* {{EventSerializer}} with {{ListSerializer}} caches massive amounts of data
#* Each child table requires separate {{SchemaChangeEvent}} publishing
#* Memory pressure from storing schemas for all partitions
h2. Optimization Approach
h3. Approach 1: Partition Table Routing Mechanism
*Solution*: Introduce {{partition.tables}} config to route child partition
events to parent table.
{code}
partition.tables: "public.orders:public\.orders_\d{6}"
{code}
*Implementation*:
* {{PostgresPartitionRules}}: Parse config, extract parent table and child
regex patterns
* {{PostgresPartitionRouter}}: Match child table ID to parent table ID via regex
* During WAL event processing, automatically replace child partition TableId
with parent
h3. Approach 2: Partition-Aware Table Filter
*Solution*: Filter out child partition tables at Debezium level, keep only
parent tables.
*Implementation*:
* {{PostgresPartitionConnectorConfig}}: Extend {{PostgresConnectorConfig}}
* {{PartitionAwareTableFilters}}: Wrap original filter, exclude tables matching
child patterns
* Child partitions won't appear in {{table.include.list}} results
h3. Approach 3: Schema Routing and Caching
*Solution*: Create dedicated Schema class supporting partition table routing
and caching.
*Implementation*:
* {{PostgresPartitionRoutingSchema}}: Extend {{PostgresSchema}}
* Override {{tableFor()}} method to route child table requests to parent
* Parent table schema loaded once, reused by all child partitions
h3. Approach 4: Primary Key Inheritance
*Solution*: Inherit primary key from representative child partition to parent
table.
*Implementation*:
* Query {{pg_inherits}} to get parent-child relationships
* Select one child partition as representative
* Read child partition's PK definition, apply to parent table schema
h2. Core Classes
||Class||Responsibility||
|{{PostgresPartitionRules}}|Parse {{partition.tables}} config, extract parent
and child regex|
|{{PostgresPartitionRouter}}|Route child TableId to parent via regex matching|
|{{PostgresPartitionConnectorConfig}}|Extend Debezium config with
partition-aware table filters|
|{{PostgresPartitionRoutingSchema}}|Extend PostgresSchema with routing and PK
inheritance|
|{{PatternCache}}|Regex compilation cache to avoid repeated compilation|
h2. Performance Comparison
||Metric||Before||After||
|Schema refresh count|Every access|First load only|
|Single table query|Load all tables|Load requested table only|
|CreateTableEvent|N (N = partition count)|1 (parent table)|
|Memory usage|High (all partition schemas)|Low (parent schema only)|
|DB query pressure|High|Low|
was:
h1. PostgreSQL Partition Table Routing Support
h2. Background
When using Flink CDC to synchronize PostgreSQL partition tables, we encountered
severe performance issues and functional defects.
h2. Problems Encountered
h3. Problem 1: Frequent Schema Refresh
*Symptom*: Schema was refreshed on every table access, causing massive
unnecessary database queries.
*Root Cause*: {{PostgresSchema.tableFor()}} method calls {{refresh()}} when
table doesn't exist, and {{refresh()}} reloads schema for ALL tables.
*Impact*:
* High database connection pressure
* High CPU and memory consumption
* Increased synchronization latency
h3. Problem 2: Full Schema Load for Single Table Query
*Symptom*: Even when requesting schema for a single table, all tables matching
{{table.include.list}} were loaded.
*Root Cause*: {{PostgresConnection.readSchema()}} uses {{TableFilter}} for
filtering, but the underlying SQL query returns all tables, then filters in
memory.
*Impact*:
* With hundreds of partition tables, each query processes massive data
* Memory usage increases dramatically
h3. Problem 3: CreateTableEvent Storm
*Symptom*: Each partition child table generates an independent
{{CreateTableEvent}}, flooding downstream systems with table creation events.
*Root Cause*: Flink CDC treats each partition child table as an independent
table.
*Impact*:
* Downstream systems (Kafka, Doris, etc.) must handle hundreds of create table
events
* Data scattered across multiple tables, cannot be queried uniformly
* Doesn't match business expectations (users expect to see parent table, not
child partitions)
h3. Problem 4: Partition Parent Table Missing Primary Key (PostgreSQL 10)
*Symptom*: PostgreSQL 10 partition parent tables have no primary key
definition, forcing users to use regex patterns to match child partitions
individually.
*Root Cause*: PostgreSQL 10 design defines primary keys on child partitions
only, parent table itself cannot have PK constraints. This forces CDC to treat
each child partition as a separate table.
*Impact*:
# *CreateTableEvent Storm*:
#* Each child partition generates its own {{CreateTableEvent}}
#* Hundreds of partitions = hundreds of schema change events
#* Downstream systems overwhelmed with table creation events
# *Massive Schema Loading*:
#* Each single table query triggers full schema load for ALL tables
#* Initialization takes extremely long time fetching schema for every child
#* High pressure on PostgreSQL database
# *Checkpoint Timeout Risk*:
#* Schema loading time too long
#* Database connections may timeout
#* Easily triggers Flink checkpoint timeout failures
# *Excessive Resource Consumption*:
#* {{EventSerializer}} with {{ListSerializer}} caches massive amounts of data
#* Each child table requires separate {{SchemaChangeEvent}} publishing
#* Memory pressure from storing schemas for all partitions
h2. Optimization Approach
h3. Approach 1: Partition Table Routing Mechanism
*Solution*: Introduce {{partition.tables}} config to route child partition
events to parent table.
{code}
partition.tables: "public.orders:public\.orders_\d{6}"
{code}
*Implementation*:
* {{PostgresPartitionRules}}: Parse config, extract parent table and child
regex patterns
* {{PostgresPartitionRouter}}: Match child table ID to parent table ID via regex
* During WAL event processing, automatically replace child partition TableId
with parent
h3. Approach 2: Partition-Aware Table Filter
*Solution*: Filter out child partition tables at Debezium level, keep only
parent tables.
*Implementation*:
* {{PostgresPartitionConnectorConfig}}: Extend {{PostgresConnectorConfig}}
* {{PartitionAwareTableFilters}}: Wrap original filter, exclude tables matching
child patterns
* Child partitions won't appear in {{table.include.list}} results
h3. Approach 3: Schema Routing and Caching
*Solution*: Create dedicated Schema class supporting partition table routing
and caching.
*Implementation*:
* {{PostgresPartitionRoutingSchema}}: Extend {{PostgresSchema}}
* Override {{tableFor()}} method to route child table requests to parent
* Parent table schema loaded once, reused by all child partitions
h3. Approach 4: Primary Key Inheritance
*Solution*: Inherit primary key from representative child partition to parent
table.
*Implementation*:
* Query {{pg_inherits}} to get parent-child relationships
* Select one child partition as representative
* Read child partition's PK definition, apply to parent table schema
h2. Core Classes
||Class||Responsibility||
|{{PostgresPartitionRules}}|Parse {{partition.tables}} config, extract parent
and child regex|
|{{PostgresPartitionRouter}}|Route child TableId to parent via regex matching|
|{{PostgresPartitionConnectorConfig}}|Extend Debezium config with
partition-aware table filters|
|{{PostgresPartitionRoutingSchema}}|Extend PostgresSchema with routing and PK
inheritance|
|{{PatternCache}}|Regex compilation cache to avoid repeated compilation|
h2. Performance Comparison
||Metric||Before||After||
|Schema refresh count|Every access|First load only|
|Single table query|Load all tables|Load requested table only|
|CreateTableEvent|N (N = partition count)|1 (parent table)|
|Memory usage|High (all partition schemas)|Low (parent schema only)|
|DB query pressure|High|Low|
> PostgreSQL Partition Table Routing Support
> ------------------------------------------
>
> Key: FLINK-38881
> URL: https://issues.apache.org/jira/browse/FLINK-38881
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Reporter: tchivs
> Priority: Major
>
> h1. PostgreSQL Partition Table Routing Support
> h2. Background
> When using Flink CDC to synchronize PostgreSQL partition tables, we
> encountered severe performance issues and functional defects.
> h2. Problems Encountered
> h3. Problem 1: Frequent Schema Refresh
> *Symptom*: Schema was refreshed on every table access, causing massive
> unnecessary database queries.
> *Root Cause*: {{PostgresSchema.tableFor()}} method calls {{refresh()}} when
> table doesn't exist, and {{refresh()}} reloads schema for ALL tables.
> *Impact*:
> * High database connection pressure
> * High CPU and memory consumption
> * Increased synchronization latency
> h3. Problem 2: Full Schema Load for Single Table Query
> *Symptom*: Even when requesting schema for a single table, all tables
> matching {{table.include.list}} were loaded.
> *Root Cause*: {{PostgresConnection.readSchema()}} uses {{TableFilter}} for
> filtering, but the underlying SQL query returns all tables, then filters in
> memory.
> *Impact*:
> * With hundreds of partition tables, each query processes massive data
> * Memory usage increases dramatically
> h3. Problem 3: Partition Parent Table Missing Primary Key (PostgreSQL 10)
> *Symptom*: PostgreSQL 10 partition parent tables have no primary key
> definition, forcing users to use regex patterns to match child partitions
> individually.
> *Root Cause*: PostgreSQL 10 design defines primary keys on child partitions
> only, parent table itself cannot have PK constraints. Users cannot directly
> monitor the parent table and must use regex to match all child partitions.
> *Note*: PostgreSQL 11+ supports {{partition.discovery.enabled}} option which
> can automatically discover partitions. But for PostgreSQL 10, this limitation
> leads to Problem 4 below.
> h3. Problem 4: CreateTableEvent Storm (Caused by Problem 3)
> *Symptom*: When using regex to match child partitions, each partition
> generates an independent {{CreateTableEvent}}, flooding downstream systems
> with table creation events.
> *Root Cause*: Due to Problem 3, users must use regex patterns like
> {{public\.orders_\d+}} to match all child partitions. Flink CDC treats each
> matched child partition as an independent table.
> *Impact*:
> # *Downstream System Overload*:
> #* Each child partition generates its own {{CreateTableEvent}}
> #* Hundreds of partitions = hundreds of schema change events
> #* Downstream systems overwhelmed with table creation events
> #* Data scattered across multiple tables, cannot be queried uniformly
> # *Massive Schema Loading*:
> #* Each single table query triggers full schema load for ALL tables
> #* Initialization takes extremely long time fetching schema for every child
> #* High pressure on PostgreSQL database
> # *Checkpoint Timeout Risk*:
> #* Schema loading time too long
> #* Database connections may timeout
> #* Easily triggers Flink checkpoint timeout failures
> # *Excessive Resource Consumption*:
> #* {{EventSerializer}} with {{ListSerializer}} caches massive amounts of data
> #* Each child table requires separate {{SchemaChangeEvent}} publishing
> #* Memory pressure from storing schemas for all partitions
> h2. Optimization Approach
> h3. Approach 1: Partition Table Routing Mechanism
> *Solution*: Introduce {{partition.tables}} config to route child partition
> events to parent table.
> {code}
> partition.tables: "public.orders:public\.orders_\d{6}"
> {code}
> *Implementation*:
> * {{PostgresPartitionRules}}: Parse config, extract parent table and child
> regex patterns
> * {{PostgresPartitionRouter}}: Match child table ID to parent table ID via
> regex
> * During WAL event processing, automatically replace child partition TableId
> with parent
> h3. Approach 2: Partition-Aware Table Filter
> *Solution*: Filter out child partition tables at Debezium level, keep only
> parent tables.
> *Implementation*:
> * {{PostgresPartitionConnectorConfig}}: Extend {{PostgresConnectorConfig}}
> * {{PartitionAwareTableFilters}}: Wrap original filter, exclude tables
> matching child patterns
> * Child partitions won't appear in {{table.include.list}} results
> h3. Approach 3: Schema Routing and Caching
> *Solution*: Create dedicated Schema class supporting partition table routing
> and caching.
> *Implementation*:
> * {{PostgresPartitionRoutingSchema}}: Extend {{PostgresSchema}}
> * Override {{tableFor()}} method to route child table requests to parent
> * Parent table schema loaded once, reused by all child partitions
> h3. Approach 4: Primary Key Inheritance
> *Solution*: Inherit primary key from representative child partition to parent
> table.
> *Implementation*:
> * Query {{pg_inherits}} to get parent-child relationships
> * Select one child partition as representative
> * Read child partition's PK definition, apply to parent table schema
> h2. Core Classes
> ||Class||Responsibility||
> |{{PostgresPartitionRules}}|Parse {{partition.tables}} config, extract parent
> and child regex|
> |{{PostgresPartitionRouter}}|Route child TableId to parent via regex matching|
> |{{PostgresPartitionConnectorConfig}}|Extend Debezium config with
> partition-aware table filters|
> |{{PostgresPartitionRoutingSchema}}|Extend PostgresSchema with routing and PK
> inheritance|
> |{{PatternCache}}|Regex compilation cache to avoid repeated compilation|
> h2. Performance Comparison
> ||Metric||Before||After||
> |Schema refresh count|Every access|First load only|
> |Single table query|Load all tables|Load requested table only|
> |CreateTableEvent|N (N = partition count)|1 (parent table)|
> |Memory usage|High (all partition schemas)|Low (parent schema only)|
> |DB query pressure|High|Low|
--
This message was sent by Atlassian Jira
(v8.20.10#820010)