CloverDew opened a new pull request, #10107:
URL: https://github.com/apache/seatunnel/pull/10107

   **Purpose of this pull request: Fixes 
https://github.com/apache/seatunnel/issues/9980**
   
   ### Problem
   
   After implementing CDC schema evolution support in Flink engine, several 
issues were identified:
   
   1. The coordinator's state resides in the TaskManager's JVM; if a job fails 
and restarts, this state is easily lost.
   2. Flink's proprietary implementation logic should not be placed in the 
public API SupportSchemaEvolutionSinkWriter. Furthermore, flush requires manual 
API implementation.
   3. Multi-parallelism configuration is not supported.
   
   ### Solution
   
   This PR made some minor adjustments to the architecture.:
   
   - **BroadcastSchemaSinkOperator** operator is introduced to solidify the 
response state.
   - **Coordinator is only responsible for communication**: pure messenger, 
stateless.
   - **Flushing is guaranteed by checkpoints, not manual implementation**
   
   #### Key Changes:
   
   **1. Enhanced LocalSchemaCoordinator**
   
   - **JobId isolation**: Prevents multi-job interference using `Map<String, 
WeakReference<LocalSchemaCoordinator>>`
   - **Pure messenger**: No persistent state, only temporary communication 
futures
   
   **3. Streamlined BroadcastSchemaSinkOperator**
   
   - **Idempotency**: tracks `lastProcessedEpoch` using Flink state
   
   **4. API Compliance (Addresses #9980)**
   
   - **API separation**: `SupportSchemaEvolutionSinkWriter` contains only 
generic `applySchemaChange()` method
   - **Flink-specific logic moved**: All coordination logic moved from API to 
Flink translation layer
   
   ### Testing
   
   - Verified MySQL CDC → MySQL sink schema evolution scenarios
   - Confirmed no data loss during schema changes  
   - Tested multi-job concurrent execution
   
   ### Breaking Changes
   
   None. This is a refactoring that maintains API compatibility while improving 
internal implementation.
   
   ### Related Issues
   
   - Fixes #9980: Move Flink-specific flush coordination from API to 
translation layer.
   - To address the issue of state loss, an operator-held state mechanism is 
introduced.
   - Multi-parallelism is already supported.
   
   ### Check list
   
   * [ ] If any new Jar binary package adding in your PR, please add License 
Notice according
     [[New License 
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new 
feature. https://github.com/apache/seatunnel/tree/dev/docs
   * [ ] If necessary, please update `incompatible-changes.md` to describe the 
incompatibility caused by this PR.
   * [ ] If you are contributing the connector code, please check that the 
following files are updated:
     1. Update 
[[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
 and add new connector information in it
     2. Update the pom file of 
[[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
     3. Add ci label in 
[[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
     4. Add e2e testcase in 
[[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
     5. Update connector 
[[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to