[
https://issues.apache.org/jira/browse/FLINK-38828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vinay Sagar Gonabavi updated FLINK-38828:
-----------------------------------------
Environment:
* Flink CDC Version: 3.3+ (issue originally found on 3.4.0/06581fc, verified
still present on master f5204243)
* Flink Version: 1.20+
* Affected Operators: PostTransformOperator
* Affected Files:
**
`flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java`
**
`flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformChangeInfo.java`
was:
* Flink CDC Version: 3.3+ (issue originally found on 3.4.0/06581fc, verified
still present on master f5204243)
* Flink Version: 1.20+
* Affected Operators: PostTransformOperator
* Affected Files:
**
`flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java`
**
`flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformChangeInfo.java`
**Note:** I have a working implementation validated with real-world use cases.
Happy to contribute after community feedback on the approach.
> PostTransformOperator needs to detect dynamic projection updates when
> restarting from checkpoint/savepoint
> ----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38828
> URL: https://issues.apache.org/jira/browse/FLINK-38828
> Project: Flink
> Issue Type: New Feature
> Components: Flink CDC
> Affects Versions: cdc-3.4.0
> Environment: * Flink CDC Version: 3.3+ (issue originally found on
> 3.4.0/06581fc, verified still present on master f5204243)
> * Flink Version: 1.20+
> * Affected Operators: PostTransformOperator
> * Affected Files:
> **
> `flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java`
> **
> `flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformChangeInfo.java`
> Reporter: Vinay Sagar Gonabavi
> Priority: Major
> Labels: Flink-CDC
>
> The `PostTransformOperator` does not persist schema state, which prevents
> dynamic projection updates when restarting Flink CDC jobs from savepoints or
> checkpoints
>
> *Steps to Reproduce*
> 1. Create a Flink CDC pipeline with a transform projection including a
> computed column:
> {code:java}
> transform:
> source-table: mydb.users
> projection: user_id, username
> {code}
> 2. Start the job and process data (downstream sink schema: `user_id,
> username`)
> 3. Create a savepoint
> 4. Update the pipeline definition to add a computed/transformed column:
> {code:java}
> transform:
> source-table: mydb.users
> projection: user_id, username, UPPER(username) as username_upper
> {code}
> 5. Restart the job from the savepoint with the updated pipeline definition
>
> *Expected Behavior* * The `PostTransformOperator` should detect that the
> projection has changed
> * It should emit `AddColumnEvent` for the new computed column
> (`username_upper`)
> * Downstream operators (SchemaOperator, DataSinkWriter) should receive the
> schema change events
> * Sink schema should be updated to include the computed column
> * Data events should flow correctly with the new schema including the
> computed field
> * The pipeline should continue processing without data loss
> *Actual Behavior*
> * The `PostTransformOperator` has no state to compare against current
> projection rules
> * No schema change events are generated for the computed column
> * Downstream sink schema remains unchanged (still only `user_id, username`)
> * The computed column `username_upper` is missing from the sink
> * Data computed by the transformation is not persisted
> * Users must restart jobs from scratch (losing checkpoint/savepoint state)
> to get the new schema
>
> *Proposed Solution*
> Implement operator state management in `PostTransformOperator` to enable
> dynamic projection evolution:
> 1. Add State Persistence: * Store `PostTransformChangeInfo` (tableId,
> pre-transform schema, post-transform schema) in operator state
> * Implement `initializeState()` to restore schemas on job restart
> * Implement `snapshotState()` to persist schemas at checkpoints
> 2. Detect Schema Evolution: * On restoration, recalculate post-transform
> schema using current projection rules
> * Compare recalculated schema with restored schema
> * Detect added columns (including computed columns with UDFs/expressions)
> * Detect removed columns
> * Generate `AddColumnEvent` / `DropColumnEvent` as needed
> 3. Emit Schema Changes:
> * Queue schema change events for each affected table
> * Emit queued events before processing first data event for each table
> * Ensures downstream operators receive schema updates before data
> 4. Backward Compatibility:
> * Support deserialization of old state format (for upgrading from versions
> without state)
> * Use versioned serialization with magic marker to distinguish formats
> * Gracefully handle missing state (first-time run or upgrade scenario)
> Unlike source columns (which come from `CreateTableEvent`), computed columns
> are:
> * Defined entirely in the projection (e.g., `UPPER(name) as name_upper`)
> * Calculated by `PostTransformOperator`
> * Not present in the source schema
> * Require explicit schema change events to appear downstream
>
> Note: I have a working implementation tested against a real world pipeline.
> Happy to contribute after community feedback on the approach.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)