Vinay Sagar Gonabavi created FLINK-38828:
--------------------------------------------

             Summary: PostTransformOperator cannot detect dynamic projection 
updates when restarting from checkpoint/savepoint
                 Key: FLINK-38828
                 URL: https://issues.apache.org/jira/browse/FLINK-38828
             Project: Flink
          Issue Type: New Feature
          Components: Flink CDC
    Affects Versions: cdc-3.4.0
         Environment: * Flink CDC Version: 3.3+ (issue originally found on 
3.4.0/06581fc, verified still present on master f5204243)
 * Flink Version: 1.20+
 * Affected Operators: PostTransformOperator
 * Affected Files:
 ** 
`flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java`
 ** 
`flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformChangeInfo.java`
**Note:** I have a working implementation validated with real-world use cases. 
Happy to contribute after community feedback on the approach.
            Reporter: Vinay Sagar Gonabavi


The `PostTransformOperator` does not persist schema state, which prevents 
dynamic projection updates when restarting Flink CDC jobs from savepoints or 
checkpoints
 
*Steps to Reproduce*

1. Create a Flink CDC pipeline with a transform projection including a computed 
column:
```
transform:
- source-table: mydb.users
projection: user_id, username
```
2. Start the job and process data (downstream sink schema: `user_id, username`)
3. Create a savepoint
4. Update the pipeline definition to add a computed/transformed column:
```
transform:
- source-table: mydb.users
projection: user_id, username, UPPER(username) as username_upper
```
5. Restart the job from the savepoint with the updated pipeline definition
 
*Expected Behavior* * The `PostTransformOperator` should detect that the 
projection has changed
 * It should emit `AddColumnEvent` for the new computed column 
(`username_upper`)
 * Downstream operators (SchemaOperator, DataSinkWriter) should receive the 
schema change events
 * Sink schema should be updated to include the computed column
 * Data events should flow correctly with the new schema including the computed 
field
 * The pipeline should continue processing without data loss
*Actual Behavior*
 * The `PostTransformOperator` has no state to compare against current 
projection rules
 * No schema change events are generated for the computed column
 * Downstream sink schema remains unchanged (still only `user_id, username`)
 * The computed column `username_upper` is missing from the sink
 * Data computed by the transformation is not persisted
 * Users must restart jobs from scratch (losing checkpoint/savepoint state) to 
get the new schema
 
*Proposed Solution*

Implement operator state management in `PostTransformOperator` to enable 
dynamic projection evolution:

1. Add State Persistence: * Store `PostTransformChangeInfo` (tableId, 
pre-transform schema, post-transform schema) in operator state
 * Implement `initializeState()` to restore schemas on job restart
 * Implement `snapshotState()` to persist schemas at checkpoints



2. Detect Schema Evolution: * On restoration, recalculate post-transform schema 
using current projection rules
 * Compare recalculated schema with restored schema
 * Detect added columns (including computed columns with UDFs/expressions)
 * Detect removed columns
 * Generate `AddColumnEvent` / `DropColumnEvent` as needed

3. Emit Schema Changes:
 * Queue schema change events for each affected table
 * Emit queued events before processing first data event for each table
 * Ensures downstream operators receive schema updates before data

4. **Backward Compatibility:**
 * Support deserialization of old state format (for upgrading from versions 
without state)
 * Use versioned serialization with magic marker to distinguish formats
 * Gracefully handle missing state (first-time run or upgrade scenario)

Unlike source columns (which come from `CreateTableEvent`), computed columns 
are:
 * Defined entirely in the projection (e.g., `UPPER(name) as name_upper`)
 * Calculated by `PostTransformOperator`
 * Not present in the source schema
 * Require explicit schema change events to appear downstream



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to