[ 
https://issues.apache.org/jira/browse/FLINK-38844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-38844:
-----------------------------------
    Labels: pull-request-available  (was: )

> Add metadata column support for PostgreSQL Pipeline Connector
> -------------------------------------------------------------
>
>                 Key: FLINK-38844
>                 URL: https://issues.apache.org/jira/browse/FLINK-38844
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: tchivs
>            Priority: Minor
>              Labels: pull-request-available
>
> h2. Component
> *Flink CDC / Pipeline Connectors / PostgreSQL*
> h2. Summary
> Add metadata column support for PostgreSQL Pipeline Connector
> h2. Description
> h3. Background
> Currently, the PostgreSQL Source Connector supports metadata columns (op_ts, 
> database_name, schema_name, table_name, row_kind) through 
> {{{}PostgreSQLReadableMetadata{}}}, but the PostgreSQL Pipeline Connector 
> does not expose these metadata columns to users.
> h3. Problem
> Users cannot access metadata information (such as operation timestamp, 
> database name, schema name, table name) when using PostgreSQL Pipeline 
> Connector in their data pipelines. This limits the ability to:
>  * Track when data changes occurred
>  * Identify the source database/schema/table of records
>  * Implement metadata-based routing or filtering logic
> h3. Proposed Solution
> Implement metadata column support for PostgreSQL Pipeline Connector by:
>  # *Create metadata column implementations* for the pipeline connector:
>  * 
>  ** {{{}OpTsMetadataColumn{}}}: Provides operation timestamp (milliseconds 
> since epoch)
>  ** {{{}DatabaseNameMetadataColumn{}}}: Provides source database name
>  ** {{{}SchemaNameMetadataColumn{}}}: Provides source schema name
>  ** {{{}TableNameMetadataColumn{}}}: Provides source table name
>  # *Update PostgresDataSource* to expose supported metadata columns via 
> {{supportedMetadataColumns()}} method
>  # *Add comprehensive tests* to verify metadata functionality in both 
> snapshot and incremental phases
>  # *Update documentation* to guide users on how to use metadata columns
> h3. Implementation Details
> h4. Metadata Columns
> The following metadata columns will be supported:
> ||Metadata Key||Data Type||Description||
> |{{op_ts}}|BIGINT NOT NULL|Operation timestamp in milliseconds. Returns 0 for 
> snapshot records, actual timestamp for incremental records|
> |{{database_name}}|STRING NOT NULL|Name of the source database|
> |{{schema_name}}|STRING NOT NULL|Name of the source schema|
> |{{table_name}}|STRING NOT NULL|Name of the source table|
> *Note:* {{row_kind}} metadata is not included because it requires {{RowData}} 
> and cannot be read from {{SourceRecord}} like other metadata columns.
>  
> {code:java}
> source:
>   type: postgres
>   hostname: localhost
>   port: 5432
>   username: postgres
>   password: postgres
>   database-name: mydb
>   schema-name: public
>   table-name: orders
> transform:
>   - source-table: public.orders
>     projection: id, order_date, amount, op_ts, database_name, schema_name, 
> table_name
>     description: Include metadata columns in output
> sink:
>   type: doris
>   # ... sink configuration
> {code}
> h3. Expected Output
> The output records will include metadata columns:
> ||id||order_date||amount||op_ts||database_name||schema_name||table_name||
> |1|2024-01-01|100.00|0|mydb|public|orders|
> |2|2024-01-02|200.00|1704182400000|mydb|public|orders|
> *Note:* {{op_ts}} is 0 for snapshot records and contains actual timestamp for 
> incremental records.
> h3. Fix Version
>  * 3.6.0 (or next release version)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to