CloverDew opened a new pull request, #10107: URL: https://github.com/apache/seatunnel/pull/10107
**Purpose of this pull request: Fixes https://github.com/apache/seatunnel/issues/9980** ### Problem After implementing CDC schema evolution support in Flink engine, several issues were identified: 1. The coordinator's state resides in the TaskManager's JVM; if a job fails and restarts, this state is easily lost. 2. Flink's proprietary implementation logic should not be placed in the public API SupportSchemaEvolutionSinkWriter. Furthermore, flush requires manual API implementation. 3. Multi-parallelism configuration is not supported. ### Solution This PR made some minor adjustments to the architecture.: - **BroadcastSchemaSinkOperator** operator is introduced to solidify the response state. - **Coordinator is only responsible for communication**: pure messenger, stateless. - **Flushing is guaranteed by checkpoints, not manual implementation** #### Key Changes: **1. Enhanced LocalSchemaCoordinator** - **JobId isolation**: Prevents multi-job interference using `Map<String, WeakReference<LocalSchemaCoordinator>>` - **Pure messenger**: No persistent state, only temporary communication futures **3. Streamlined BroadcastSchemaSinkOperator** - **Idempotency**: tracks `lastProcessedEpoch` using Flink state **4. API Compliance (Addresses #9980)** - **API separation**: `SupportSchemaEvolutionSinkWriter` contains only generic `applySchemaChange()` method - **Flink-specific logic moved**: All coordination logic moved from API to Flink translation layer ### Testing - Verified MySQL CDC → MySQL sink schema evolution scenarios - Confirmed no data loss during schema changes - Tested multi-job concurrent execution ### Breaking Changes None. This is a refactoring that maintains API compatibility while improving internal implementation. ### Related Issues - Fixes #9980: Move Flink-specific flush coordination from API to translation layer. - To address the issue of state loss, an operator-held state mechanism is introduced. - Multi-parallelism is already supported. ### Check list * [ ] If any new Jar binary package adding in your PR, please add License Notice according [[New License Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md) * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs * [ ] If necessary, please update `incompatible-changes.md` to describe the incompatibility caused by this PR. * [ ] If you are contributing the connector code, please check that the following files are updated: 1. Update [[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties) and add new connector information in it 2. Update the pom file of [[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml) 3. Add ci label in [[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml) 4. Add e2e testcase in [[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/) 5. Update connector [[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)](https://github.com/apache/seatunnel/blob/dev/config/plugin_config) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
