Vinay Sagar Gonabavi created FLINK-39434:
--------------------------------------------
Summary: Transform table-options changes are not propagated to
existing downstream tables on savepoint restart
Key: FLINK-39434
URL: https://issues.apache.org/jira/browse/FLINK-39434
Project: Flink
Issue Type: New Feature
Components: Flink CDC
Affects Versions: cdc-3.4.0
Environment: * Flink CDC Version: 3.3+ (issue found on 3.4.0, present
on master)
* Flink Version: 1.20+
Reporter: Vinay Sagar Gonabavi
Changing {{table-options}} in a transform definition and restarting from
savepoint has no effect on existing downstream tables. The new options are
silently dropped.
h2. Steps to Reproduce
# Deploy a pipeline with transform {{{}table-options{}}}:
{code:java}
transform:
- source-table: mydb.orders
table-options:
contains_si: "false" {code}
# Let it create the downstream Paimon table ({{{}contains_si=false{}}})
# Take a savepoint, change the option to {{{}contains_si: "true"{}}}, restart
from savepoint
# Observe: the Paimon table still has {{contains_si=false}}
h2. Expected Behavior
* Option changes propagate to the existing downstream table on restart
h2. Actual Behavior
The event is blocked by *5 gaps* in the pipeline
||#||Location||Problem||
|1|{{SchemaUtils.isSchemaChangeEventRedundant()}}|CreateTableEvent handler
returns latestSchema.isPresent() , checks only table existence, not schema
equality|
|2|{{SchemaMergingUtils.getSchemaDifference()}}|Only compares columns. Options
are ignored|
|3|{{SchemaChangeEventType}}|No {{AlterTableOptionsEvent}} exists , no event
type can carry option changes.|
|4|{{PaimonMetadataApplier}}|Options only applied at {{CREATE TABLE}} time with
{{ignoreIfExists{{{}=true}}{}}}. No {{ALTER TABLE SET OPTIONS}} handler.|
|5|{{SchemaMergingUtils.getLeastCommonSchema()}}|Uses
{{currentSchema.copy(commonColumns)}} which preserves old schema's options,
discarding incoming options in many-to-one routing.|
h2. Proposed Solution
* *Change isSchemaChangeEventRedundant() -* CreateTableEvent handler to
compare schemas via Schema.equals() (which already includes options) instead of
just checking latestSchema.isPresent()
* *Add AlterTableOptionsEvent -* New event class + {{ALTER_TABLE_OPTIONS}}
entry in {{SchemaChangeEventType}} + visitor support
* *Detect option diffs -* Extend getSchemaDifference() to emit
AlterTableOptionsEvent when beforeSchema.options() != afterSchema.options()
* *Merge options in many-to-one routing -* Update {{getLeastCommonSchema()}}
to merge incoming options instead of discarding them
* *Handle in sink appliers -* Add ALTER TABLE SET OPTIONS support to
{{PaimonMetadataApplier}} (via Paimon's
{{SchemaChange.setOption()/removeOption())}} and possibly other connectors
h2. Use Case
Dynamically marking Paimon tables with properties like {{contains_si=true}}
based on source table metadata, so downstream consumers can identify tables
with secondary indexes.
h2. Note
I am willing to contribute a PR
--
This message was sent by Atlassian Jira
(v8.20.10#820010)