[
https://issues.apache.org/jira/browse/FLINK-38844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-38844:
-----------------------------------
Labels: pull-request-available (was: )
> Add metadata column support for PostgreSQL Pipeline Connector
> -------------------------------------------------------------
>
> Key: FLINK-38844
> URL: https://issues.apache.org/jira/browse/FLINK-38844
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Reporter: tchivs
> Priority: Minor
> Labels: pull-request-available
>
> h2. Component
> *Flink CDC / Pipeline Connectors / PostgreSQL*
> h2. Summary
> Add metadata column support for PostgreSQL Pipeline Connector
> h2. Description
> h3. Background
> Currently, the PostgreSQL Source Connector supports metadata columns (op_ts,
> database_name, schema_name, table_name, row_kind) through
> {{{}PostgreSQLReadableMetadata{}}}, but the PostgreSQL Pipeline Connector
> does not expose these metadata columns to users.
> h3. Problem
> Users cannot access metadata information (such as operation timestamp,
> database name, schema name, table name) when using PostgreSQL Pipeline
> Connector in their data pipelines. This limits the ability to:
> * Track when data changes occurred
> * Identify the source database/schema/table of records
> * Implement metadata-based routing or filtering logic
> h3. Proposed Solution
> Implement metadata column support for PostgreSQL Pipeline Connector by:
> # *Create metadata column implementations* for the pipeline connector:
> *
> ** {{{}OpTsMetadataColumn{}}}: Provides operation timestamp (milliseconds
> since epoch)
> ** {{{}DatabaseNameMetadataColumn{}}}: Provides source database name
> ** {{{}SchemaNameMetadataColumn{}}}: Provides source schema name
> ** {{{}TableNameMetadataColumn{}}}: Provides source table name
> # *Update PostgresDataSource* to expose supported metadata columns via
> {{supportedMetadataColumns()}} method
> # *Add comprehensive tests* to verify metadata functionality in both
> snapshot and incremental phases
> # *Update documentation* to guide users on how to use metadata columns
> h3. Implementation Details
> h4. Metadata Columns
> The following metadata columns will be supported:
> ||Metadata Key||Data Type||Description||
> |{{op_ts}}|BIGINT NOT NULL|Operation timestamp in milliseconds. Returns 0 for
> snapshot records, actual timestamp for incremental records|
> |{{database_name}}|STRING NOT NULL|Name of the source database|
> |{{schema_name}}|STRING NOT NULL|Name of the source schema|
> |{{table_name}}|STRING NOT NULL|Name of the source table|
> *Note:* {{row_kind}} metadata is not included because it requires {{RowData}}
> and cannot be read from {{SourceRecord}} like other metadata columns.
>
> {code:java}
> source:
> type: postgres
> hostname: localhost
> port: 5432
> username: postgres
> password: postgres
> database-name: mydb
> schema-name: public
> table-name: orders
> transform:
> - source-table: public.orders
> projection: id, order_date, amount, op_ts, database_name, schema_name,
> table_name
> description: Include metadata columns in output
> sink:
> type: doris
> # ... sink configuration
> {code}
> h3. Expected Output
> The output records will include metadata columns:
> ||id||order_date||amount||op_ts||database_name||schema_name||table_name||
> |1|2024-01-01|100.00|0|mydb|public|orders|
> |2|2024-01-02|200.00|1704182400000|mydb|public|orders|
> *Note:* {{op_ts}} is 0 for snapshot records and contains actual timestamp for
> incremental records.
> h3. Fix Version
> * 3.6.0 (or next release version)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)