Adamyuanyuan opened a new pull request, #9828:
URL: https://github.com/apache/seatunnel/pull/9828

   https://github.com/apache/seatunnel/issues/9826
   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code 
changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
     - Make sure that the pull request corresponds to a [GITHUB 
issue](https://github.com/apache/seatunnel/issues).
     - Name the pull request in the form "[Feature] [component] Title of the 
pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix 
typo in README.md doc`.
   -->
   
   ### Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull 
request adds checkstyle plugin.-->
   Fix flink Streaming file sink may lose data due to prepareCommit/write race 
and aggressive txn dir cleanup
   
   The underlying engine is Flink. When writing to Hive from Kafka in 
**Streaming mode**, the function is found to be unavailable. However, in 
**Batch mode**, data can be written to Hive normally.  
   #### 1. Checkpoint Failure Issue
   When the data volume to be written is large, writing to HDFS fails. The 
specific performance under different data throughput is as follows:  
   
   | Records per Second | Hive-Parquet | Hive-Text |
   | --- | --- | --- |
   | 5 records/sec | Normal | Normal |
   | 100 records/sec | Occasional failure (once every 8 minutes) | Normal |
   | 1000 records/sec | Fails (with a similar frequency to the 100 records/sec 
scenario) | Fails |
   
   <img width="2638" height="1496" alt="Image" 
src="https://github.com/user-attachments/assets/2a05365a-81db-4fbb-bbc4-84d408c021e0";
 />
   
   <img width="2554" height="1118" alt="Image" 
src="https://github.com/user-attachments/assets/4f0ea1fa-1edf-4f47-8cc3-5a5a7554b6da";
 />
   
   
   #### 2. Data Loss Issue
   + **File Sink/Hive Sink** may lose data, or fail to commit with the error 
"source missing" during the renaming process.  
   + In addition, we have also observed logs of "already finished … skip" from 
idempotent paths.
   
   
   ### Affected Modules
   Under the **Flink Streaming** scenario:  
   + All file write operations (including Hive write operations)  
   + Connectors: `connector-file`, `connector-hive`
   
   
   ### Root cause
   - After prepareCommit, writers may still write into the same transaction 
directory before snapshotState. The committer renames listed files and deletes 
the transaction directory, which can accidentally remove files intended for the 
next commit. Additionally, rename lacks short visibility wait and strict 
idempotency checks.
   
   ### Proposal
   
   - Writer rotates to a new transaction immediately after prepareCommit; 
snapshotState skips double rotation via a flag.
   - Committer only deletes transaction directory when empty (ignoring 
hidden/system files); otherwise warn and skip.
   - HadoopFileSystemProxy.renameFile adds a short visibility wait and treats 
“target exists” as idempotent success; on real missing, prints diagnostics.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   <!--
   Note that it means *any* user-facing change including all aspects such as 
the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes 
- provide the console output, description and/or an example to show the 
behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to 
the released SeaTunnel versions or within the unreleased branches such as dev.
   If no, write 'No'.
   If you are adding/modifying connector documents, please follow our new 
specifications: https://github.com/apache/seatunnel/issues/4544.
   -->
   no
   
   ### How was this patch tested?
   
   <!--
   If tests were added, say they were added here. Please make sure to add some 
test cases that check the changes thoroughly including negative and positive 
cases if possible.
   If it was tested in a way different from regular unit tests, please clarify 
how you tested step by step, ideally copy and paste-able, so that other 
reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why 
it was difficult to add.
   If you are adding E2E test cases, maybe refer to 
https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf,
 here is a good example.
   -->
   Large-scale testing in the test environment, no checkpoint failed, no data 
lose.
   
   <img width="3200" height="1662" alt="image" 
src="https://github.com/user-attachments/assets/6d52aa1a-427b-4609-924c-8aa47eee7cd8";
 />
   
   <img width="2682" height="1486" alt="image" 
src="https://github.com/user-attachments/assets/1f6a8181-5501-4edc-b3cb-b47173a039f4";
 />
   
   <img width="2632" height="1514" alt="image" 
src="https://github.com/user-attachments/assets/8587dea1-5794-4dd5-9903-94ce039d3f3e";
 />
   
   ### Check list
   
   * [ ] If any new Jar binary package adding in your PR, please add License 
Notice according
     [New License 
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new 
feature. https://github.com/apache/seatunnel/tree/dev/docs
   * [ ] If you are contributing the connector code, please check that the 
following files are updated:
     1. Update 
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
 and add new connector information in it
     2. Update the pom file of 
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
     3. Add ci label in 
[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
     4. Add e2e testcase in 
[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
     5. Update connector 
[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to