VinaySagarGonabavi opened a new pull request, #4299:
URL: https://github.com/apache/flink-cdc/pull/4299

   ## What is the purpose of the change
   This is a v2 approach following feedback on PR #4280 — schema evolution 
responsibility stays in `SchemaCoordinator` rather than adding state to 
`PostTransformOperator`, keeping the transform operator stateless as intended 
by PR #4056.
   
   When a Flink CDC pipeline uses transform projections and the pipeline is 
restarted from a savepoint/checkpoint with an updated projection, the 
downstream sink never receives schema change events for the new/removed 
columns, leaving the sink schema stale and out of sync with the actual 
projected data.
   
   This PR fixes the issue by teaching SchemaCoordinator to detect schema 
differences when a `CreateTableEvent` arrives for an already-known table. 
Instead of treating it as redundant, the coordinator computes the schema diff 
and emits appropriate `AddColumnEvent`/`DropColumnEvent`/`AlterColumnTypeEvent` 
downstream. `PostTransformOperator` remains stateless - it's a pure mapping 
node.
   
   ## Brief change log
   
   - `SchemaUtils.isSchemaChangeEventRedundant()`: CreateTableEvent is now 
redundant only if the table exists AND the schema is identical (was: just 
existence check)
   - `SchemaCoordinator.deduceEvolvedSchemaChanges()`: When a CreateTableEvent 
arrives for an existing table in 1:1 routing, computes schema diff via 
SchemaMergingUtils.getSchemaDifference() and returns diff events instead of 
forwarding the raw CreateTableEvent
   
   **Note**: `isSchemaChangeEventRedundant` is also called by Hudi and Paimon 
sink connectors. The behavioral change is a bugfix for those paths as well 
(they were silently dropping CreateTableEvents with changed schemas), but no 
connector-specific code was modified.
   
   ## Verifying this change
   3 new test methods across 3 files:
   
   - `SchemaUtilsTest`: Redundancy checks for CreateTableEvent - non-existent 
table, same schema, extra columns, type diff, fewer columns
   - `SchemaMergingUtilsTest`: Diff scenarios for projection changes - add 
column, drop column, type change, swap columns, identical schemas
   - `SchemaEvolveTest`: End-to-end through SchemaOperator + SchemaCoordinator 
- add column, drop column, type change, identical schema (no-op)
   
   ## Does this pull request potentially affect one of the following parts
   
   - Dependencies: no
   - The public API: no
   - The runtime per-record code path: no (only affects schema change event 
processing)
   - Anything that affects deployment or recovery: yes (changes how 
CreateTableEvent is handled after checkpoint restore)
   
   ## Documentation
   
   - Does this pull request introduce a new feature? No (fixes existing 
behavior gap)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to