dybyte opened a new pull request, #9459: URL: https://github.com/apache/seatunnel/pull/9459
### Purpose of this pull request In this PR, we added support for upsert session mode in the MaxCompute sink. Previously, only the upload session mode was supported to insert data into MaxCompute. With this change, users can now insert, update, and delete data using upsert sessions, which allows updating or deleting existing rows based on primary key values. For reference, see [MaxCompute Upsert Session](https://github.com/aliyun/aliyun-odps-java-sdk/blob/release/0.51.x/docs/docs/api-reference/tunnel/UpsertSession.md). **Why add a PK-based locking mechanism?** In a multi-threaded environment, concurrent upsert operations targeting the same primary key (PK) can cause race conditions, leading to unexpected behavior or data inconsistency. According to the official documentation: "Due to the writing characteristics of the primary key table, we should carefully control the writing logic when writing to the same table (partition) concurrently. If there are multiple concurrent writes to the same primary key at the same time, unexpected behavior may occur. A common solution is to use the shuffle by pk operation to assign records with the same primary key to the same thread for writing." Based on this, we introduced a PK-based locking mechanism that uses a striped lock pool keyed by a hash of the primary key values. ### Does this PR introduce _any_ user-facing change? Yes. This PR adds upsert and delete support for the MaxCompute connector, enabling users to perform these operations on MaxCompute tables. Additionally, it introduces a primary key based locking mechanism to handle concurrent upsert requests safely in multi-threaded scenarios, preventing race conditions and ensuring data consistency. Users can now rely on the connector for more robust and concurrent write operations to MaxCompute tables with primary keys. ### How was this patch tested? - Added unit tests to verify the upsert and delete functionalities. - Added multi-threaded test cases (testLockProcessWithSameId_MultiThreaded) to ensure that the primary key based lock mechanism works correctly and prevents race conditions when multiple threads try to upsert the same row concurrently. - Negative cases were also handled to ensure that invalid rows (e.g., null primary keys) throw proper exceptions. - Tried to add an integration test for MaxCompute, but currently it is disabled due to limitations in the maxcompute-emulator: `@Disabled("maxcompute-emulator does not support upload session for now, we need to move to upsert session in MaxComputeWriter") ` So full end-to-end tests will be enabled when local MaxCompute upsert session support is available. ### Check list * [x] If any new Jar binary package adding in your PR, please add License Notice according [New License Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md) * [x] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs * [ ] If you are contributing the connector code, please check that the following files are updated: 1. Update [plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties) and add new connector information in it 2. Update the pom file of [seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml) 3. Add ci label in [label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml) 4. Add e2e testcase in [seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/) 5. Update connector [plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
