[ 
https://issues.apache.org/jira/browse/FLINK-39840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tianfy updated FLINK-39840:
---------------------------
    Description: 
The Oracle pipeline connector should preserve the full Oracle table identifier 
when emitting pipeline events.

The main correctness problem behind this change is schema/table structure 
mix-up in Oracle deployments that expose multiple schemas. Oracle table 
metadata is uniquely identified by database.schema.table. If Flink CDC drops 
the database/catalog part or uses only schema.table on some paths, same-named 
Oracle tables from different schemas can be resolved through different 
identifiers. When those tables have different column layouts, CreateTableEvent, 
SchemaChangeEvent, and data change records may be associated with the wrong 
table schema. This can surface as fields from another table appearing in 
CreateTableEvent, incorrect schema evolution, or type conversion failures while 
writing downstream data.

Currently some paths convert Debezium table IDs to Flink CDC table IDs by 
dropping the database/catalog part and only keeping schema.table. Data change 
records, schema change records, and CreateTableEvent can therefore use 
different table identifiers for the same Oracle table.

This also breaks downstream routing in jobs that rely on database.schema.table 
identifiers, but the primary correctness issue is that the connector can bind 
records to the wrong Oracle table structure when same-named tables exist in 
different schemas.

Expected behavior:
* data change events use database.schema.table
* schema change events use database.schema.table
* CreateTableEvent uses the same database.schema.table identifier
* table schema lookup and schema evolution use the same full identifier

The fix should use the table identifier from the SourceRecord source struct 
where possible instead of parsing the Kafka topic name, and should keep the 
catalog/database part when converting Debezium TableId to Flink CDC TableId.

  was:
The Oracle pipeline connector should preserve the full Oracle table identifier 
when emitting pipeline events.

Currently some paths convert Debezium table IDs to Flink CDC table IDs by 
dropping the database/catalog part and only keeping schema.table. Data change 
records, schema change records, and CreateTableEvent can therefore use 
different table identifiers for the same Oracle table.

This can break downstream routing and schema evolution in jobs that rely on 
database.schema.table identifiers.

Expected behavior:
* data change events use database.schema.table
* schema change events use database.schema.table
* CreateTableEvent uses the same database.schema.table identifier

The fix should use the table identifier from the SourceRecord source struct 
where possible instead of parsing the Kafka topic name, and should keep the 
catalog/database part when converting Debezium TableId to Flink CDC TableId.


> Oracle pipeline connector emits incomplete table identifiers for data and 
> schema change events
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39840
>                 URL: https://issues.apache.org/jira/browse/FLINK-39840
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>            Reporter: tianfy
>            Priority: Major
>              Labels: pull-request-available
>
> The Oracle pipeline connector should preserve the full Oracle table 
> identifier when emitting pipeline events.
> The main correctness problem behind this change is schema/table structure 
> mix-up in Oracle deployments that expose multiple schemas. Oracle table 
> metadata is uniquely identified by database.schema.table. If Flink CDC drops 
> the database/catalog part or uses only schema.table on some paths, same-named 
> Oracle tables from different schemas can be resolved through different 
> identifiers. When those tables have different column layouts, 
> CreateTableEvent, SchemaChangeEvent, and data change records may be 
> associated with the wrong table schema. This can surface as fields from 
> another table appearing in CreateTableEvent, incorrect schema evolution, or 
> type conversion failures while writing downstream data.
> Currently some paths convert Debezium table IDs to Flink CDC table IDs by 
> dropping the database/catalog part and only keeping schema.table. Data change 
> records, schema change records, and CreateTableEvent can therefore use 
> different table identifiers for the same Oracle table.
> This also breaks downstream routing in jobs that rely on 
> database.schema.table identifiers, but the primary correctness issue is that 
> the connector can bind records to the wrong Oracle table structure when 
> same-named tables exist in different schemas.
> Expected behavior:
> * data change events use database.schema.table
> * schema change events use database.schema.table
> * CreateTableEvent uses the same database.schema.table identifier
> * table schema lookup and schema evolution use the same full identifier
> The fix should use the table identifier from the SourceRecord source struct 
> where possible instead of parsing the Kafka topic name, and should keep the 
> catalog/database part when converting Debezium TableId to Flink CDC TableId.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to