[
https://issues.apache.org/jira/browse/FLINK-38844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
tchivs updated FLINK-38844:
---------------------------
Description:
h2. Component
*Flink CDC / Pipeline Connectors / PostgreSQL*
h2. Summary
Add metadata column support for PostgreSQL Pipeline Connector
h2. Description
h3. Background
Currently, the PostgreSQL Source Connector supports metadata columns (op_ts,
database_name, schema_name, table_name, row_kind) through
{{{}PostgreSQLReadableMetadata{}}}, but the PostgreSQL Pipeline Connector does
not expose these metadata columns to users.
h3. Problem
Users cannot access metadata information (such as operation timestamp, database
name, schema name, table name) when using PostgreSQL Pipeline Connector in
their data pipelines. This limits the ability to:
* Track when data changes occurred
* Identify the source database/schema/table of records
* Implement metadata-based routing or filtering logic
h3. Proposed Solution
Implement metadata column support for PostgreSQL Pipeline Connector by:
# *Create metadata column implementations* for the pipeline connector:
*
** {{{}OpTsMetadataColumn{}}}: Provides operation timestamp (milliseconds
since epoch)
** {{{}DatabaseNameMetadataColumn{}}}: Provides source database name
** {{{}SchemaNameMetadataColumn{}}}: Provides source schema name
** {{{}TableNameMetadataColumn{}}}: Provides source table name
# *Update PostgresDataSource* to expose supported metadata columns via
{{supportedMetadataColumns()}} method
# *Add comprehensive tests* to verify metadata functionality in both snapshot
and incremental phases
# *Update documentation* to guide users on how to use metadata columns
h3. Implementation Details
h4. Metadata Columns
The following metadata columns will be supported:
||Metadata Key||Data Type||Description||
|{{op_ts}}|BIGINT NOT NULL|Operation timestamp in milliseconds. Returns 0 for
snapshot records, actual timestamp for incremental records|
|{{database_name}}|STRING NOT NULL|Name of the source database|
|{{schema_name}}|STRING NOT NULL|Name of the source schema|
|{{table_name}}|STRING NOT NULL|Name of the source table|
*Note:* {{row_kind}} metadata is not included because it requires {{RowData}}
and cannot be read from {{SourceRecord}} like other metadata columns.
{code:java}
source:
type: postgres
hostname: localhost
port: 5432
username: postgres
password: postgres
database-name: mydb
schema-name: public
table-name: orders
transform:
- source-table: public.orders
projection: id, order_date, amount, op_ts, database_name, schema_name,
table_name
description: Include metadata columns in output
sink:
type: doris
# ... sink configuration
{code}
h3. Expected Output
The output records will include metadata columns:
||id||order_date||amount||op_ts||database_name||schema_name||table_name||
|1|2024-01-01|100.00|0|mydb|public|orders|
|2|2024-01-02|200.00|1704182400000|mydb|public|orders|
*Note:* {{op_ts}} is 0 for snapshot records and contains actual timestamp for
incremental records.
h3. Fix Version
* 3.6.0 (or next release version)
was:
h2. Component
*Flink CDC / Pipeline Connectors / PostgreSQL*
h2. Summary
Add metadata column support for PostgreSQL Pipeline Connector
h2. Description
h3. Background
Currently, the PostgreSQL Source Connector supports metadata columns (op_ts,
database_name, schema_name, table_name, row_kind) through
{{{}PostgreSQLReadableMetadata{}}}, but the PostgreSQL Pipeline Connector does
not expose these metadata columns to users.
h3. Problem
Users cannot access metadata information (such as operation timestamp, database
name, schema name, table name) when using PostgreSQL Pipeline Connector in
their data pipelines. This limits the ability to:
* Track when data changes occurred
* Identify the source database/schema/table of records
* Implement metadata-based routing or filtering logic
h3. Proposed Solution
Implement metadata column support for PostgreSQL Pipeline Connector by:
# *Create metadata column implementations* for the pipeline connector:
** {{{}OpTsMetadataColumn{}}}: Provides operation timestamp (milliseconds
since epoch)
** {{{}DatabaseNameMetadataColumn{}}}: Provides source database name
** {{{}SchemaNameMetadataColumn{}}}: Provides source schema name
** {{{}TableNameMetadataColumn{}}}: Provides source table name
# *Update PostgresDataSource* to expose supported metadata columns via
{{supportedMetadataColumns()}} method
# *Add comprehensive tests* to verify metadata functionality in both snapshot
and incremental phases
# *Update documentation* to guide users on how to use metadata columns
h3. Implementation Details
h4. Metadata Columns
The following metadata columns will be supported:
||Metadata Key||Data Type||Description||
|{{op_ts}}|BIGINT NOT NULL|Operation timestamp in milliseconds. Returns 0 for
snapshot records, actual timestamp for incremental records|
|{{database_name}}|STRING NOT NULL|Name of the source database|
|{{schema_name}}|STRING NOT NULL|Name of the source schema|
|{{table_name}}|STRING NOT NULL|Name of the source table|
*Note:* {{row_kind}} metadata is not included because it requires {{RowData}}
and cannot be read from {{SourceRecord}} like other metadata columns.
h4. Architecture
{{PostgresDataSource
├── supportedMetadataColumns()
│ ├── OpTsMetadataColumn
│ ├── DatabaseNameMetadataColumn
│ ├── SchemaNameMetadataColumn
│ └── TableNameMetadataColumn
└── PostgresEventDeserializer
└── getMetadata() - reads metadata from SourceRecord}}
h2. Usage Example
h3. Pipeline Configuration
{{}}
{code:java}
source:
type: postgres
hostname: localhost
port: 5432
username: postgres
password: postgres
database-name: mydb
schema-name: public
table-name: orders
transform:
- source-table: public.orders
projection: id, order_date, amount, op_ts, database_name, schema_name,
table_name
description: Include metadata columns in output
sink:
type: doris
# ... sink configuration
{code}
{{}}
h3. Expected Output
The output records will include metadata columns:
||id||order_date||amount||op_ts||database_name||schema_name||table_name||
|1|2024-01-01|100.00|0|mydb|public|orders|
|2|2024-01-02|200.00|1704182400000|mydb|public|orders|
*Note:* {{op_ts}} is 0 for snapshot records and contains actual timestamp for
incremental records.
h3. Fix Version
* 3.6.0 (or next release version)
> Add metadata column support for PostgreSQL Pipeline Connector
> -------------------------------------------------------------
>
> Key: FLINK-38844
> URL: https://issues.apache.org/jira/browse/FLINK-38844
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Reporter: tchivs
> Priority: Minor
>
> h2. Component
> *Flink CDC / Pipeline Connectors / PostgreSQL*
> h2. Summary
> Add metadata column support for PostgreSQL Pipeline Connector
> h2. Description
> h3. Background
> Currently, the PostgreSQL Source Connector supports metadata columns (op_ts,
> database_name, schema_name, table_name, row_kind) through
> {{{}PostgreSQLReadableMetadata{}}}, but the PostgreSQL Pipeline Connector
> does not expose these metadata columns to users.
> h3. Problem
> Users cannot access metadata information (such as operation timestamp,
> database name, schema name, table name) when using PostgreSQL Pipeline
> Connector in their data pipelines. This limits the ability to:
> * Track when data changes occurred
> * Identify the source database/schema/table of records
> * Implement metadata-based routing or filtering logic
> h3. Proposed Solution
> Implement metadata column support for PostgreSQL Pipeline Connector by:
> # *Create metadata column implementations* for the pipeline connector:
> *
> ** {{{}OpTsMetadataColumn{}}}: Provides operation timestamp (milliseconds
> since epoch)
> ** {{{}DatabaseNameMetadataColumn{}}}: Provides source database name
> ** {{{}SchemaNameMetadataColumn{}}}: Provides source schema name
> ** {{{}TableNameMetadataColumn{}}}: Provides source table name
> # *Update PostgresDataSource* to expose supported metadata columns via
> {{supportedMetadataColumns()}} method
> # *Add comprehensive tests* to verify metadata functionality in both
> snapshot and incremental phases
> # *Update documentation* to guide users on how to use metadata columns
> h3. Implementation Details
> h4. Metadata Columns
> The following metadata columns will be supported:
> ||Metadata Key||Data Type||Description||
> |{{op_ts}}|BIGINT NOT NULL|Operation timestamp in milliseconds. Returns 0 for
> snapshot records, actual timestamp for incremental records|
> |{{database_name}}|STRING NOT NULL|Name of the source database|
> |{{schema_name}}|STRING NOT NULL|Name of the source schema|
> |{{table_name}}|STRING NOT NULL|Name of the source table|
> *Note:* {{row_kind}} metadata is not included because it requires {{RowData}}
> and cannot be read from {{SourceRecord}} like other metadata columns.
>
> {code:java}
> source:
> type: postgres
> hostname: localhost
> port: 5432
> username: postgres
> password: postgres
> database-name: mydb
> schema-name: public
> table-name: orders
> transform:
> - source-table: public.orders
> projection: id, order_date, amount, op_ts, database_name, schema_name,
> table_name
> description: Include metadata columns in output
> sink:
> type: doris
> # ... sink configuration
> {code}
> h3. Expected Output
> The output records will include metadata columns:
> ||id||order_date||amount||op_ts||database_name||schema_name||table_name||
> |1|2024-01-01|100.00|0|mydb|public|orders|
> |2|2024-01-02|200.00|1704182400000|mydb|public|orders|
> *Note:* {{op_ts}} is 0 for snapshot records and contains actual timestamp for
> incremental records.
> h3. Fix Version
> * 3.6.0 (or next release version)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)