VinaySagarGonabavi opened a new pull request, #4280:
URL: https://github.com/apache/flink-cdc/pull/4280
## What is the purpose of the change
When a Flink CDC pipeline uses transform projections (e.g., `*, UPPER(col)
AS computed_col`) and the pipeline is restarted from a savepoint/checkpoint
with an updated projection (e.g., adding or removing a computed column), the
`PostTransformOperator` does not detect the projection change. The downstream
sink never receives schema change events for the new or removed columns,
leaving the sink schema stale and out of sync with the actual projected data.
This PR adds state persistence to `PostTransformOperator` so that on
restore, it can detect differences between the old (checkpointed)
post-transform schema and the newly calculated one, and emit appropriate
`AddColumnEvent`/`DropColumnEvent` before processing data.
## Brief change log
- **`PostTransformChangeInfo.java`**: Added `Serializer` inner class
implementing `SimpleVersionedSerializer<PostTransformChangeInfo>` with
backward-compatible deserialization using a magic marker pattern
(`__magic_post_transform__`)
- **`PostTransformOperator.java`**:
- Added `initializeState()` / `snapshotState()` overrides using union list
state (`ListState<byte[]>`)
- On restore, recalculates post-transform schemas with current projection
rules using all effective transformers with
`SchemaMergingUtils.strictlyMergeSchemas()` (matching `processCreateTableEvent`
logic) and generates `AddColumnEvent`/`DropColumnEvent` for detected differences
- Emits pending schema changes before the first `DataChangeEvent` per table
- `CreateTableEvent` clears pending changes for the table (fresh schema
supersedes old diff)
- Moved `invalidateCache(tableId)` to before processing
`CreateTableEvent`/`SchemaChangeEvent` (was after), ensuring fresh processors
are created with the new schema
- Modified `open()` with guards against double-initialization since
`initializeState()` now runs before `open()`
## Verifying this change
This change added 16 new tests:
**Unit tests (`PostTransformChangeInfoTest.java` — 7 tests):**
- Serializer version number verification
- Roundtrip serialization with simple schema
- Roundtrip serialization with complex schema (TIMESTAMP, DECIMAL, BYTES)
- Roundtrip with single-part TableId (no namespace/schema)
- Backward-compatible deserialization of old format (no magic marker)
- Backward-compatible deserialization of old format with complex schema
- Equivalence between new and old format deserialization
**Integration tests (`TransformOperatorWithSchemaEvolveTest.java` — 9
tests):**
- `generateSchemaChangeEvents` produces correct `AddColumnEvent` at LAST
position
- `generateSchemaChangeEvents` produces correct `AddColumnEvent` with AFTER
position
- `generateSchemaChangeEvents` produces correct `AddColumnEvent` at FIRST
position
- `generateSchemaChangeEvents` produces correct `DropColumnEvent`
- Mixed add/drop column changes in a single diff
- No-op when schemas are identical
- Pending schema changes emitted before `DataChangeEvent`
- `CreateTableEvent` clears pending changes
- Multiple pending schema changes emitted in order
## Does this pull request potentially affect one of the following parts
- Dependencies: no
- The public API: no
- The runtime per-record code path: yes (adds a single `HashMap.remove()`
call per `DataChangeEvent`; returns `null` after the first event per table —
negligible overhead)
- Anything that affects deployment or recovery: yes (adds operator state for
checkpoint/savepoint persistence)
## Documentation
- Does this pull request introduce a new feature? No (fixes existing
behavior gap)
- If yes, how is the feature documented? N/A
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]