VinaySagarGonabavi opened a new pull request, #4280:
URL: https://github.com/apache/flink-cdc/pull/4280

   ## What is the purpose of the change
   
   When a Flink CDC pipeline uses transform projections (e.g., `*, UPPER(col) 
AS computed_col`) and the pipeline is restarted from a savepoint/checkpoint 
with an updated projection (e.g., adding or removing a computed column), the 
`PostTransformOperator` does not detect the projection change. The downstream 
sink never receives schema change events for the new or removed columns, 
leaving the sink schema stale and out of sync with the actual projected data.
   
   This PR adds state persistence to `PostTransformOperator` so that on 
restore, it can detect differences between the old (checkpointed) 
post-transform schema and the newly calculated one, and emit appropriate 
`AddColumnEvent`/`DropColumnEvent` before processing data.
   
   ## Brief change log
   
   - **`PostTransformChangeInfo.java`**: Added `Serializer` inner class 
implementing `SimpleVersionedSerializer<PostTransformChangeInfo>` with 
backward-compatible deserialization using a magic marker pattern 
(`__magic_post_transform__`)
   
   - **`PostTransformOperator.java`**:
     - Added `initializeState()` / `snapshotState()` overrides using union list 
state (`ListState<byte[]>`)
     - On restore, recalculates post-transform schemas with current projection 
rules using all effective transformers with 
`SchemaMergingUtils.strictlyMergeSchemas()` (matching `processCreateTableEvent` 
logic) and generates `AddColumnEvent`/`DropColumnEvent` for detected differences
     - Emits pending schema changes before the first `DataChangeEvent` per table
     - `CreateTableEvent` clears pending changes for the table (fresh schema 
supersedes old diff)
     - Moved `invalidateCache(tableId)` to before processing 
`CreateTableEvent`/`SchemaChangeEvent` (was after), ensuring fresh processors 
are created with the new schema
     - Modified `open()` with guards against double-initialization since 
`initializeState()` now runs before `open()`
   
   ## Verifying this change
   
   This change added 16 new tests:
   
   **Unit tests (`PostTransformChangeInfoTest.java` — 7 tests):**
   - Serializer version number verification
   - Roundtrip serialization with simple schema
   - Roundtrip serialization with complex schema (TIMESTAMP, DECIMAL, BYTES)
   - Roundtrip with single-part TableId (no namespace/schema)
   - Backward-compatible deserialization of old format (no magic marker)
   - Backward-compatible deserialization of old format with complex schema
   - Equivalence between new and old format deserialization
   
   **Integration tests (`TransformOperatorWithSchemaEvolveTest.java` — 9 
tests):**
   - `generateSchemaChangeEvents` produces correct `AddColumnEvent` at LAST 
position
   - `generateSchemaChangeEvents` produces correct `AddColumnEvent` with AFTER 
position
   - `generateSchemaChangeEvents` produces correct `AddColumnEvent` at FIRST 
position
   - `generateSchemaChangeEvents` produces correct `DropColumnEvent`
   - Mixed add/drop column changes in a single diff
   - No-op when schemas are identical
   - Pending schema changes emitted before `DataChangeEvent`
   - `CreateTableEvent` clears pending changes
   - Multiple pending schema changes emitted in order
   
   ## Does this pull request potentially affect one of the following parts
   
   - Dependencies: no
   - The public API: no
   - The runtime per-record code path: yes (adds a single `HashMap.remove()` 
call per `DataChangeEvent`; returns `null` after the first event per table — 
negligible overhead)
   - Anything that affects deployment or recovery: yes (adds operator state for 
checkpoint/savepoint persistence)
   
   ## Documentation
   
   - Does this pull request introduce a new feature? No (fixes existing 
behavior gap)
   - If yes, how is the feature documented? N/A


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to