Vinay Sagar Gonabavi created FLINK-39434:
--------------------------------------------

             Summary: Transform table-options changes are not propagated to 
existing downstream tables on savepoint restart
                 Key: FLINK-39434
                 URL: https://issues.apache.org/jira/browse/FLINK-39434
             Project: Flink
          Issue Type: New Feature
          Components: Flink CDC
    Affects Versions: cdc-3.4.0
         Environment: * Flink CDC Version: 3.3+ (issue found on 3.4.0, present 
on master)
 * Flink Version: 1.20+

 
            Reporter: Vinay Sagar Gonabavi


Changing {{table-options}} in a transform definition and restarting from 
savepoint has no effect on existing downstream tables. The new options are 
silently dropped.
h2. Steps to Reproduce
 # Deploy a pipeline with transform {{{}table-options{}}}:

{code:java}
transform:
  - source-table: mydb.orders
    table-options:
      contains_si: "false" {code}
 # Let it create the downstream Paimon table ({{{}contains_si=false{}}})
 # Take a savepoint, change the option to {{{}contains_si: "true"{}}}, restart 
from savepoint
 # Observe: the Paimon table still has {{contains_si=false}}

h2. Expected Behavior
 * Option changes propagate to the existing downstream table on restart

h2. Actual Behavior

The event is blocked by *5 gaps* in the pipeline
||#||Location||Problem||
|1|{{SchemaUtils.isSchemaChangeEventRedundant()}}|CreateTableEvent handler 
returns latestSchema.isPresent() , checks only table existence, not schema 
equality|
|2|{{SchemaMergingUtils.getSchemaDifference()}}|Only compares columns. Options 
are ignored|
|3|{{SchemaChangeEventType}}|No {{AlterTableOptionsEvent}}  exists , no event 
type can carry option changes.|
|4|{{PaimonMetadataApplier}}|Options only applied at {{CREATE TABLE}} time with 
{{ignoreIfExists{{{}=true}}{}}}. No {{ALTER TABLE SET OPTIONS}} handler.|
|5|{{SchemaMergingUtils.getLeastCommonSchema()}}|Uses 
{{currentSchema.copy(commonColumns)}} which preserves old schema's options, 
discarding incoming options in many-to-one routing.|
h2. Proposed Solution
 * *Change isSchemaChangeEventRedundant() -* CreateTableEvent handler to 
compare schemas via Schema.equals() (which already includes options) instead of 
just checking latestSchema.isPresent()
 * *Add AlterTableOptionsEvent -* New event class + {{ALTER_TABLE_OPTIONS}} 
entry in {{SchemaChangeEventType}} + visitor support
 * *Detect option diffs -*  Extend getSchemaDifference() to emit 
AlterTableOptionsEvent when beforeSchema.options() != afterSchema.options()
 * *Merge options in many-to-one routing -* Update {{getLeastCommonSchema()}} 
to merge incoming options instead of discarding them
 * *Handle in sink appliers -* Add ALTER TABLE SET OPTIONS support to 
{{PaimonMetadataApplier}} (via Paimon's 
{{SchemaChange.setOption()/removeOption())}} and possibly other connectors

h2. Use Case 

Dynamically marking Paimon tables with properties like {{contains_si=true}} 
based on source table metadata, so downstream consumers can identify tables 
with secondary indexes.
h2. Note

I am willing to contribute a PR



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to