[ 
https://issues.apache.org/jira/browse/FLINK-38828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinay Sagar Gonabavi updated FLINK-38828:
-----------------------------------------
    Environment: 
* Flink CDC Version: 3.3+ (issue originally found on 3.4.0/06581fc, verified 
still present on master f5204243)
 * Flink Version: 1.20+
 * Affected Operators: PostTransformOperator
 * Affected Files:
 ** 
`flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java`
 ** 
`flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformChangeInfo.java`

  was:
* Flink CDC Version: 3.3+ (issue originally found on 3.4.0/06581fc, verified 
still present on master f5204243)
 * Flink Version: 1.20+
 * Affected Operators: PostTransformOperator
 * Affected Files:
 ** 
`flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java`
 ** 
`flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformChangeInfo.java`
**Note:** I have a working implementation validated with real-world use cases. 
Happy to contribute after community feedback on the approach.


> PostTransformOperator needs to detect dynamic projection updates when 
> restarting from checkpoint/savepoint
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38828
>                 URL: https://issues.apache.org/jira/browse/FLINK-38828
>             Project: Flink
>          Issue Type: New Feature
>          Components: Flink CDC
>    Affects Versions: cdc-3.4.0
>         Environment: * Flink CDC Version: 3.3+ (issue originally found on 
> 3.4.0/06581fc, verified still present on master f5204243)
>  * Flink Version: 1.20+
>  * Affected Operators: PostTransformOperator
>  * Affected Files:
>  ** 
> `flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java`
>  ** 
> `flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformChangeInfo.java`
>            Reporter: Vinay Sagar Gonabavi
>            Priority: Major
>              Labels: Flink-CDC
>
> The `PostTransformOperator` does not persist schema state, which prevents 
> dynamic projection updates when restarting Flink CDC jobs from savepoints or 
> checkpoints
>  
> *Steps to Reproduce*
> 1. Create a Flink CDC pipeline with a transform projection including a 
> computed column:
> {code:java}
> transform:
>     source-table: mydb.users
>     projection: user_id, username
> {code}
> 2. Start the job and process data (downstream sink schema: `user_id, 
> username`)
> 3. Create a savepoint
> 4. Update the pipeline definition to add a computed/transformed column:
> {code:java}
> transform:
>     source-table: mydb.users
>     projection: user_id, username, UPPER(username) as username_upper
> {code}
> 5. Restart the job from the savepoint with the updated pipeline definition
>  
> *Expected Behavior* * The `PostTransformOperator` should detect that the 
> projection has changed
>  * It should emit `AddColumnEvent` for the new computed column 
> (`username_upper`)
>  * Downstream operators (SchemaOperator, DataSinkWriter) should receive the 
> schema change events
>  * Sink schema should be updated to include the computed column
>  * Data events should flow correctly with the new schema including the 
> computed field
>  * The pipeline should continue processing without data loss
> *Actual Behavior*
>  * The `PostTransformOperator` has no state to compare against current 
> projection rules
>  * No schema change events are generated for the computed column
>  * Downstream sink schema remains unchanged (still only `user_id, username`)
>  * The computed column `username_upper` is missing from the sink
>  * Data computed by the transformation is not persisted
>  * Users must restart jobs from scratch (losing checkpoint/savepoint state) 
> to get the new schema
>  
> *Proposed Solution*
> Implement operator state management in `PostTransformOperator` to enable 
> dynamic projection evolution:
> 1. Add State Persistence: * Store `PostTransformChangeInfo` (tableId, 
> pre-transform schema, post-transform schema) in operator state
>  * Implement `initializeState()` to restore schemas on job restart
>  * Implement `snapshotState()` to persist schemas at checkpoints
> 2. Detect Schema Evolution: * On restoration, recalculate post-transform 
> schema using current projection rules
>  * Compare recalculated schema with restored schema
>  * Detect added columns (including computed columns with UDFs/expressions)
>  * Detect removed columns
>  * Generate `AddColumnEvent` / `DropColumnEvent` as needed
> 3. Emit Schema Changes:
>  * Queue schema change events for each affected table
>  * Emit queued events before processing first data event for each table
>  * Ensures downstream operators receive schema updates before data
> 4. Backward Compatibility:
>  * Support deserialization of old state format (for upgrading from versions 
> without state)
>  * Use versioned serialization with magic marker to distinguish formats
>  * Gracefully handle missing state (first-time run or upgrade scenario)
> Unlike source columns (which come from `CreateTableEvent`), computed columns 
> are:
>  * Defined entirely in the projection (e.g., `UPPER(name) as name_upper`)
>  * Calculated by `PostTransformOperator`
>  * Not present in the source schema
>  * Require explicit schema change events to appear downstream
>  
> Note: I have a working implementation tested against a real world pipeline. 
> Happy to contribute after community feedback on the approach.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to