Adamyuanyuan opened a new pull request, #10279:
URL: https://github.com/apache/seatunnel/pull/10279

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code 
changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
     - Make sure that the pull request corresponds to a [GITHUB 
issue](https://github.com/apache/seatunnel/issues).
     - Name the pull request in the form "[Feature] [component] Title of the 
pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix 
typo in README.md doc`.
   -->
   
   ### Purpose of this pull request
   
   When running SeaTunnel on Flink in **STREAMING** mode with Hive sink 
`overwrite: true`, the final Hive partition/table directory may lose previously 
committed files and end up containing only a subset of data (often only files 
from the last checkpoint).
   
   ### Reproduction
   - Use the pipeline such as:
     - `env.job.mode = "STREAMING"`
     - Hive sink: `overwrite: true`
     - Constant partition (e.g. SQL transform adds `'2025-12-16' as pt`, so all 
records go to the same partition)
   - Observe job logs: every completed checkpoint triggers an aggregated 
commit, and the same target partition directory is deleted repeatedly before 
renaming/moving the new files.
   - Result: the partition directory is cleared on every checkpoint, so only 
the newest checkpoint’s files remain.
   
   ### Root Cause
   `overwrite: true` is normalized to `DataSaveMode.DROP_DATA`. In 
`HiveSinkAggregatedCommitter#commit(...)`, the implementation deleted the 
target table/partition directories **before every commit**. In Flink streaming, 
`commit()` is invoked after **every completed checkpoint**, so the delete step 
was executed repeatedly and wiped files committed by earlier checkpoints.
   
   ### Fix
   Implemented overwrite semantics that are safe for streaming checkpoints:
   - Delete each target directory (table directory or partition directory) **at 
most once per job attempt**, and only when the commit contains actual files 
(skip deletion for empty commits).
   - Best-effort recovery protection: parse `checkpointId` from 
`transactionDir` (pattern like `.../T_xxx_<subtaskIndex>_<checkpointId>`). If 
the first checkpoint id seen by this committer is `> 1` (usually indicates 
recovery from a previous checkpoint), skip deletion to avoid removing already 
committed data that matches the restored state.
   
   
   ### Does this PR introduce _any_ user-facing change?
   no
   
   ### How was this patch tested?
   yes,UT and tested in our test env.
   
   ### Check list
   
   * [ ] If any new Jar binary package adding in your PR, please add License 
Notice according
     [New License 
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new 
feature. https://github.com/apache/seatunnel/tree/dev/docs
   * [ ] If necessary, please update `incompatible-changes.md` to describe the 
incompatibility caused by this PR.
   * [ ] If you are contributing the connector code, please check that the 
following files are updated:
     1. Update 
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
 and add new connector information in it
     2. Update the pom file of 
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
     3. Add ci label in 
[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
     4. Add e2e testcase in 
[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
     5. Update connector 
[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to