Vinay Sagar Gonabavi created FLINK-38828:
--------------------------------------------
Summary: PostTransformOperator cannot detect dynamic projection
updates when restarting from checkpoint/savepoint
Key: FLINK-38828
URL: https://issues.apache.org/jira/browse/FLINK-38828
Project: Flink
Issue Type: New Feature
Components: Flink CDC
Affects Versions: cdc-3.4.0
Environment: * Flink CDC Version: 3.3+ (issue originally found on
3.4.0/06581fc, verified still present on master f5204243)
* Flink Version: 1.20+
* Affected Operators: PostTransformOperator
* Affected Files:
**
`flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java`
**
`flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformChangeInfo.java`
**Note:** I have a working implementation validated with real-world use cases.
Happy to contribute after community feedback on the approach.
Reporter: Vinay Sagar Gonabavi
The `PostTransformOperator` does not persist schema state, which prevents
dynamic projection updates when restarting Flink CDC jobs from savepoints or
checkpoints
*Steps to Reproduce*
1. Create a Flink CDC pipeline with a transform projection including a computed
column:
```
transform:
- source-table: mydb.users
projection: user_id, username
```
2. Start the job and process data (downstream sink schema: `user_id, username`)
3. Create a savepoint
4. Update the pipeline definition to add a computed/transformed column:
```
transform:
- source-table: mydb.users
projection: user_id, username, UPPER(username) as username_upper
```
5. Restart the job from the savepoint with the updated pipeline definition
*Expected Behavior* * The `PostTransformOperator` should detect that the
projection has changed
* It should emit `AddColumnEvent` for the new computed column
(`username_upper`)
* Downstream operators (SchemaOperator, DataSinkWriter) should receive the
schema change events
* Sink schema should be updated to include the computed column
* Data events should flow correctly with the new schema including the computed
field
* The pipeline should continue processing without data loss
*Actual Behavior*
* The `PostTransformOperator` has no state to compare against current
projection rules
* No schema change events are generated for the computed column
* Downstream sink schema remains unchanged (still only `user_id, username`)
* The computed column `username_upper` is missing from the sink
* Data computed by the transformation is not persisted
* Users must restart jobs from scratch (losing checkpoint/savepoint state) to
get the new schema
*Proposed Solution*
Implement operator state management in `PostTransformOperator` to enable
dynamic projection evolution:
1. Add State Persistence: * Store `PostTransformChangeInfo` (tableId,
pre-transform schema, post-transform schema) in operator state
* Implement `initializeState()` to restore schemas on job restart
* Implement `snapshotState()` to persist schemas at checkpoints
2. Detect Schema Evolution: * On restoration, recalculate post-transform schema
using current projection rules
* Compare recalculated schema with restored schema
* Detect added columns (including computed columns with UDFs/expressions)
* Detect removed columns
* Generate `AddColumnEvent` / `DropColumnEvent` as needed
3. Emit Schema Changes:
* Queue schema change events for each affected table
* Emit queued events before processing first data event for each table
* Ensures downstream operators receive schema updates before data
4. **Backward Compatibility:**
* Support deserialization of old state format (for upgrading from versions
without state)
* Use versioned serialization with magic marker to distinguish formats
* Gracefully handle missing state (first-time run or upgrade scenario)
Unlike source columns (which come from `CreateTableEvent`), computed columns
are:
* Defined entirely in the projection (e.g., `UPPER(name) as name_upper`)
* Calculated by `PostTransformOperator`
* Not present in the source schema
* Require explicit schema change events to appear downstream
--
This message was sent by Atlassian Jira
(v8.20.10#820010)