dybyte opened a new pull request, #9459:
URL: https://github.com/apache/seatunnel/pull/9459

   ### Purpose of this pull request
   
   In this PR, we added support for upsert session mode in the MaxCompute sink.
   Previously, only the upload session mode was supported to insert data into 
MaxCompute.
   With this change, users can now insert, update, and delete data using upsert 
sessions, which allows updating or deleting existing rows based on primary key 
values.
   For reference, see [MaxCompute Upsert 
Session](https://github.com/aliyun/aliyun-odps-java-sdk/blob/release/0.51.x/docs/docs/api-reference/tunnel/UpsertSession.md).
   
   **Why add a PK-based locking mechanism?**
   In a multi-threaded environment, concurrent upsert operations targeting the 
same primary key (PK) can cause race conditions, leading to unexpected behavior 
or data inconsistency.
   According to the official documentation:
   
   "Due to the writing characteristics of the primary key table, we should 
carefully control the writing logic when writing to the same table (partition) 
concurrently. If there are multiple concurrent writes to the same primary key 
at the same time, unexpected behavior may occur. A common solution is to use 
the shuffle by pk operation to assign records with the same primary key to the 
same thread for writing."
   
   Based on this, we introduced a PK-based locking mechanism that uses a 
striped lock pool keyed by a hash of the primary key values.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes.
   
   This PR adds upsert and delete support for the MaxCompute connector, 
enabling users to perform these operations on MaxCompute tables. Additionally, 
it introduces a primary key based locking mechanism to handle concurrent upsert 
requests safely in multi-threaded scenarios, preventing race conditions and 
ensuring data consistency.
   
   Users can now rely on the connector for more robust and concurrent write 
operations to MaxCompute tables with primary keys.
   
   
   ### How was this patch tested?
   
   - Added unit tests to verify the upsert and delete functionalities.
   
   - Added multi-threaded test cases (testLockProcessWithSameId_MultiThreaded) 
to ensure that the primary key based lock mechanism works correctly and 
prevents race conditions when multiple threads try to upsert the same row 
concurrently.
   
   - Negative cases were also handled to ensure that invalid rows (e.g., null 
primary keys) throw proper exceptions.
   
   - Tried to add an integration test for MaxCompute, but currently it is 
disabled due to limitations in the maxcompute-emulator:
   `@Disabled("maxcompute-emulator does not support upload session for now, we 
need to move to upsert session in MaxComputeWriter")
   `
   So full end-to-end tests will be enabled when local MaxCompute upsert 
session support is available.
   
   ### Check list
   
   * [x] If any new Jar binary package adding in your PR, please add License 
Notice according
     [New License 
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [x] If necessary, please update the documentation to describe the new 
feature. https://github.com/apache/seatunnel/tree/dev/docs
   * [ ] If you are contributing the connector code, please check that the 
following files are updated:
     1. Update 
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
 and add new connector information in it
     2. Update the pom file of 
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
     3. Add ci label in 
[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
     4. Add e2e testcase in 
[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
     5. Update connector 
[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to