VinaySagarGonabavi opened a new pull request, #4299: URL: https://github.com/apache/flink-cdc/pull/4299
## What is the purpose of the change This is a v2 approach following feedback on PR #4280 — schema evolution responsibility stays in `SchemaCoordinator` rather than adding state to `PostTransformOperator`, keeping the transform operator stateless as intended by PR #4056. When a Flink CDC pipeline uses transform projections and the pipeline is restarted from a savepoint/checkpoint with an updated projection, the downstream sink never receives schema change events for the new/removed columns, leaving the sink schema stale and out of sync with the actual projected data. This PR fixes the issue by teaching SchemaCoordinator to detect schema differences when a `CreateTableEvent` arrives for an already-known table. Instead of treating it as redundant, the coordinator computes the schema diff and emits appropriate `AddColumnEvent`/`DropColumnEvent`/`AlterColumnTypeEvent` downstream. `PostTransformOperator` remains stateless - it's a pure mapping node. ## Brief change log - `SchemaUtils.isSchemaChangeEventRedundant()`: CreateTableEvent is now redundant only if the table exists AND the schema is identical (was: just existence check) - `SchemaCoordinator.deduceEvolvedSchemaChanges()`: When a CreateTableEvent arrives for an existing table in 1:1 routing, computes schema diff via SchemaMergingUtils.getSchemaDifference() and returns diff events instead of forwarding the raw CreateTableEvent **Note**: `isSchemaChangeEventRedundant` is also called by Hudi and Paimon sink connectors. The behavioral change is a bugfix for those paths as well (they were silently dropping CreateTableEvents with changed schemas), but no connector-specific code was modified. ## Verifying this change 3 new test methods across 3 files: - `SchemaUtilsTest`: Redundancy checks for CreateTableEvent - non-existent table, same schema, extra columns, type diff, fewer columns - `SchemaMergingUtilsTest`: Diff scenarios for projection changes - add column, drop column, type change, swap columns, identical schemas - `SchemaEvolveTest`: End-to-end through SchemaOperator + SchemaCoordinator - add column, drop column, type change, identical schema (no-op) ## Does this pull request potentially affect one of the following parts - Dependencies: no - The public API: no - The runtime per-record code path: no (only affects schema change event processing) - Anything that affects deployment or recovery: yes (changes how CreateTableEvent is handled after checkpoint restore) ## Documentation - Does this pull request introduce a new feature? No (fixes existing behavior gap) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
