tchivs created FLINK-38844:
------------------------------

             Summary: Add metadata column support for PostgreSQL Pipeline 
Connector
                 Key: FLINK-38844
                 URL: https://issues.apache.org/jira/browse/FLINK-38844
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
    Affects Versions: cdc-3.5.0
            Reporter: tchivs


h2. Component

*Flink CDC / Pipeline Connectors / PostgreSQL*
h2. Summary

Add metadata column support for PostgreSQL Pipeline Connector
h2. Description
h3. Background

Currently, the PostgreSQL Source Connector supports metadata columns (op_ts, 
database_name, schema_name, table_name, row_kind) through 
{{{}PostgreSQLReadableMetadata{}}}, but the PostgreSQL Pipeline Connector does 
not expose these metadata columns to users.
h3. Problem

Users cannot access metadata information (such as operation timestamp, database 
name, schema name, table name) when using PostgreSQL Pipeline Connector in 
their data pipelines. This limits the ability to:
 * Track when data changes occurred
 * Identify the source database/schema/table of records
 * Implement metadata-based routing or filtering logic

h3. Proposed Solution

Implement metadata column support for PostgreSQL Pipeline Connector by:
 # *Create metadata column implementations* for the pipeline connector:

 ** {{{}OpTsMetadataColumn{}}}: Provides operation timestamp (milliseconds 
since epoch)
 ** {{{}DatabaseNameMetadataColumn{}}}: Provides source database name
 ** {{{}SchemaNameMetadataColumn{}}}: Provides source schema name
 ** {{{}TableNameMetadataColumn{}}}: Provides source table name
 # *Update PostgresDataSource* to expose supported metadata columns via 
{{supportedMetadataColumns()}} method

 # *Add comprehensive tests* to verify metadata functionality in both snapshot 
and incremental phases

 # *Update documentation* to guide users on how to use metadata columns

h3. Implementation Details
h4. Metadata Columns

The following metadata columns will be supported:
||Metadata Key||Data Type||Description||
|{{op_ts}}|BIGINT NOT NULL|Operation timestamp in milliseconds. Returns 0 for 
snapshot records, actual timestamp for incremental records|
|{{database_name}}|STRING NOT NULL|Name of the source database|
|{{schema_name}}|STRING NOT NULL|Name of the source schema|
|{{table_name}}|STRING NOT NULL|Name of the source table|

*Note:* {{row_kind}} metadata is not included because it requires {{RowData}} 
and cannot be read from {{SourceRecord}} like other metadata columns.
h4. Architecture
 
 
 
{{PostgresDataSource
  ├── supportedMetadataColumns()
  │   ├── OpTsMetadataColumn
  │   ├── DatabaseNameMetadataColumn
  │   ├── SchemaNameMetadataColumn
  │   └── TableNameMetadataColumn
  └── PostgresEventDeserializer
      └── getMetadata() - reads metadata from SourceRecord}}
h2. Usage Example
h3. Pipeline Configuration

{{}}

 
{code:java}
source:
  type: postgres
  hostname: localhost
  port: 5432
  username: postgres
  password: postgres
  database-name: mydb
  schema-name: public
  table-name: orders

transform:
  - source-table: public.orders
    projection: id, order_date, amount, op_ts, database_name, schema_name, 
table_name
    description: Include metadata columns in output

sink:
  type: doris
  # ... sink configuration

{code}
{{}}

 
h3. Expected Output

The output records will include metadata columns:
||id||order_date||amount||op_ts||database_name||schema_name||table_name||
|1|2024-01-01|100.00|0|mydb|public|orders|
|2|2024-01-02|200.00|1704182400000|mydb|public|orders|

*Note:* {{op_ts}} is 0 for snapshot records and contains actual timestamp for 
incremental records.
h3. Fix Version
 * 3.6.0 (or next release version)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to