[ 
https://issues.apache.org/jira/browse/FLINK-38828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinay Sagar Gonabavi updated FLINK-38828:
-----------------------------------------
    Description: 
The `PostTransformOperator` does not persist schema state, which prevents 
dynamic projection updates when restarting Flink CDC jobs from savepoints or 
checkpoints
 
*Steps to Reproduce*

1. Create a Flink CDC pipeline with a transform projection including a computed 
column:
{code:java}
transform:
    source-table: mydb.users
    projection: user_id, username
{code}
2. Start the job and process data (downstream sink schema: `user_id, username`)
3. Create a savepoint
4. Update the pipeline definition to add a computed/transformed column:
{code:java}
transform:
    source-table: mydb.users
    projection: user_id, username, UPPER(username) as username_upper
{code}
5. Restart the job from the savepoint with the updated pipeline definition
 
*Expected Behavior* * The `PostTransformOperator` should detect that the 
projection has changed
 * It should emit `AddColumnEvent` for the new computed column 
(`username_upper`)
 * Downstream operators (SchemaOperator, DataSinkWriter) should receive the 
schema change events
 * Sink schema should be updated to include the computed column
 * Data events should flow correctly with the new schema including the computed 
field
 * The pipeline should continue processing without data loss

*Actual Behavior*
 * The `PostTransformOperator` has no state to compare against current 
projection rules
 * No schema change events are generated for the computed column
 * Downstream sink schema remains unchanged (still only `user_id, username`)
 * The computed column `username_upper` is missing from the sink
 * Data computed by the transformation is not persisted
 * Users must restart jobs from scratch (losing checkpoint/savepoint state) to 
get the new schema

 
*Proposed Solution*

Implement operator state management in `PostTransformOperator` to enable 
dynamic projection evolution:

1. Add State Persistence: * Store `PostTransformChangeInfo` (tableId, 
pre-transform schema, post-transform schema) in operator state
 * Implement `initializeState()` to restore schemas on job restart
 * Implement `snapshotState()` to persist schemas at checkpoints

2. Detect Schema Evolution: * On restoration, recalculate post-transform schema 
using current projection rules
 * Compare recalculated schema with restored schema
 * Detect added columns (including computed columns with UDFs/expressions)
 * Detect removed columns
 * Generate `AddColumnEvent` / `DropColumnEvent` as needed

3. Emit Schema Changes:
 * Queue schema change events for each affected table
 * Emit queued events before processing first data event for each table
 * Ensures downstream operators receive schema updates before data

4. Backward Compatibility:
 * Support deserialization of old state format (for upgrading from versions 
without state)
 * Use versioned serialization with magic marker to distinguish formats
 * Gracefully handle missing state (first-time run or upgrade scenario)

Unlike source columns (which come from `CreateTableEvent`), computed columns 
are:
 * Defined entirely in the projection (e.g., `UPPER(name) as name_upper`)
 * Calculated by `PostTransformOperator`
 * Not present in the source schema
 * Require explicit schema change events to appear downstream

 

Note: I have a working implementation tested against a real world pipeline. 
Happy to contribute after community feedback on the approach.

  was:
The `PostTransformOperator` does not persist schema state, which prevents 
dynamic projection updates when restarting Flink CDC jobs from savepoints or 
checkpoints
 
*Steps to Reproduce*

1. Create a Flink CDC pipeline with a transform projection including a computed 
column:

 
{code:java}
transform:
    source-table: mydb.users
    projection: user_id, username
{code}
2. Start the job and process data (downstream sink schema: `user_id, username`)
3. Create a savepoint
4. Update the pipeline definition to add a computed/transformed column:
{code:java}
transform:
    source-table: mydb.users
    projection: user_id, username, UPPER(username) as username_upper
{code}
5. Restart the job from the savepoint with the updated pipeline definition
 
*Expected Behavior* * The `PostTransformOperator` should detect that the 
projection has changed
 * It should emit `AddColumnEvent` for the new computed column 
(`username_upper`)
 * Downstream operators (SchemaOperator, DataSinkWriter) should receive the 
schema change events
 * Sink schema should be updated to include the computed column
 * Data events should flow correctly with the new schema including the computed 
field
 * The pipeline should continue processing without data loss

*Actual Behavior*
 * The `PostTransformOperator` has no state to compare against current 
projection rules
 * No schema change events are generated for the computed column
 * Downstream sink schema remains unchanged (still only `user_id, username`)
 * The computed column `username_upper` is missing from the sink
 * Data computed by the transformation is not persisted
 * Users must restart jobs from scratch (losing checkpoint/savepoint state) to 
get the new schema

 
*Proposed Solution*

Implement operator state management in `PostTransformOperator` to enable 
dynamic projection evolution:

1. Add State Persistence: * Store `PostTransformChangeInfo` (tableId, 
pre-transform schema, post-transform schema) in operator state
 * Implement `initializeState()` to restore schemas on job restart
 * Implement `snapshotState()` to persist schemas at checkpoints

2. Detect Schema Evolution: * On restoration, recalculate post-transform schema 
using current projection rules
 * Compare recalculated schema with restored schema
 * Detect added columns (including computed columns with UDFs/expressions)
 * Detect removed columns
 * Generate `AddColumnEvent` / `DropColumnEvent` as needed

3. Emit Schema Changes:
 * Queue schema change events for each affected table
 * Emit queued events before processing first data event for each table
 * Ensures downstream operators receive schema updates before data

4. Backward Compatibility:
 * Support deserialization of old state format (for upgrading from versions 
without state)
 * Use versioned serialization with magic marker to distinguish formats
 * Gracefully handle missing state (first-time run or upgrade scenario)

Unlike source columns (which come from `CreateTableEvent`), computed columns 
are:
 * Defined entirely in the projection (e.g., `UPPER(name) as name_upper`)
 * Calculated by `PostTransformOperator`
 * Not present in the source schema
 * Require explicit schema change events to appear downstream

 

Note: I have a working implementation tested against a real world pipeline. 
Happy to contribute after community feedback on the approach.


> PostTransformOperator cannot detect dynamic projection updates when 
> restarting from checkpoint/savepoint
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38828
>                 URL: https://issues.apache.org/jira/browse/FLINK-38828
>             Project: Flink
>          Issue Type: New Feature
>          Components: Flink CDC
>    Affects Versions: cdc-3.4.0
>         Environment: * Flink CDC Version: 3.3+ (issue originally found on 
> 3.4.0/06581fc, verified still present on master f5204243)
>  * Flink Version: 1.20+
>  * Affected Operators: PostTransformOperator
>  * Affected Files:
>  ** 
> `flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java`
>  ** 
> `flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformChangeInfo.java`
> **Note:** I have a working implementation validated with real-world use 
> cases. Happy to contribute after community feedback on the approach.
>            Reporter: Vinay Sagar Gonabavi
>            Priority: Major
>              Labels: Flink-CDC
>
> The `PostTransformOperator` does not persist schema state, which prevents 
> dynamic projection updates when restarting Flink CDC jobs from savepoints or 
> checkpoints
>  
> *Steps to Reproduce*
> 1. Create a Flink CDC pipeline with a transform projection including a 
> computed column:
> {code:java}
> transform:
>     source-table: mydb.users
>     projection: user_id, username
> {code}
> 2. Start the job and process data (downstream sink schema: `user_id, 
> username`)
> 3. Create a savepoint
> 4. Update the pipeline definition to add a computed/transformed column:
> {code:java}
> transform:
>     source-table: mydb.users
>     projection: user_id, username, UPPER(username) as username_upper
> {code}
> 5. Restart the job from the savepoint with the updated pipeline definition
>  
> *Expected Behavior* * The `PostTransformOperator` should detect that the 
> projection has changed
>  * It should emit `AddColumnEvent` for the new computed column 
> (`username_upper`)
>  * Downstream operators (SchemaOperator, DataSinkWriter) should receive the 
> schema change events
>  * Sink schema should be updated to include the computed column
>  * Data events should flow correctly with the new schema including the 
> computed field
>  * The pipeline should continue processing without data loss
> *Actual Behavior*
>  * The `PostTransformOperator` has no state to compare against current 
> projection rules
>  * No schema change events are generated for the computed column
>  * Downstream sink schema remains unchanged (still only `user_id, username`)
>  * The computed column `username_upper` is missing from the sink
>  * Data computed by the transformation is not persisted
>  * Users must restart jobs from scratch (losing checkpoint/savepoint state) 
> to get the new schema
>  
> *Proposed Solution*
> Implement operator state management in `PostTransformOperator` to enable 
> dynamic projection evolution:
> 1. Add State Persistence: * Store `PostTransformChangeInfo` (tableId, 
> pre-transform schema, post-transform schema) in operator state
>  * Implement `initializeState()` to restore schemas on job restart
>  * Implement `snapshotState()` to persist schemas at checkpoints
> 2. Detect Schema Evolution: * On restoration, recalculate post-transform 
> schema using current projection rules
>  * Compare recalculated schema with restored schema
>  * Detect added columns (including computed columns with UDFs/expressions)
>  * Detect removed columns
>  * Generate `AddColumnEvent` / `DropColumnEvent` as needed
> 3. Emit Schema Changes:
>  * Queue schema change events for each affected table
>  * Emit queued events before processing first data event for each table
>  * Ensures downstream operators receive schema updates before data
> 4. Backward Compatibility:
>  * Support deserialization of old state format (for upgrading from versions 
> without state)
>  * Use versioned serialization with magic marker to distinguish formats
>  * Gracefully handle missing state (first-time run or upgrade scenario)
> Unlike source columns (which come from `CreateTableEvent`), computed columns 
> are:
>  * Defined entirely in the projection (e.g., `UPPER(name) as name_upper`)
>  * Calculated by `PostTransformOperator`
>  * Not present in the source schema
>  * Require explicit schema change events to appear downstream
>  
> Note: I have a working implementation tested against a real world pipeline. 
> Happy to contribute after community feedback on the approach.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to