Samrat002 opened a new pull request, #28268:
URL: https://github.com/apache/flink/pull/28268
## What is the purpose of the change
`NativeS3RecoverableWriter.recover()` silently discarded the sub-part-size
tail that `persist()` had durably uploaded to S3 as a side object. After a
crash-and-restore cycle, any bytes written since the last full-part boundary
were permanently lost, violating Flink's exactly-once guarantee.
This pull request fixes the data loss by downloading the side object during
`recover()` and seeding the resumed output stream with those bytes before
accepting further writes.
## Brief change log
- **`NativeS3RecoverableWriter`**: `recover()` now reads
`incompleteObjectName()` / `incompleteObjectLength()` from the recoverable; if
non-null, downloads the side object to a local temp file via `getObject()` and
passes it as a seed tail to the resumed stream. A `downloadIncompleteTail()`
helper handles length validation and cleans up the local file on any failure
(suppressing secondary cleanup exceptions). The side object itself is
intentionally left in S3 so that re-recovery from the same checkpoint remains
correct; deletion remains the responsibility of `cleanupRecoverableState()`.
- **`NativeS3RecoverableFsDataOutputStream`**: Added a new 8-argument resume
constructor accepting an optional `seedTailFile` / `seedTailLength`. A new
`adoptSeedTailFile()` helper opens the file in append mode and sets
`currentPartSize` to the seed length so the stream's position accounting is
correct from the start. The existing 7-argument constructor delegates to the
new one with `null, 0L`.
- **`InMemoryNativeS3Operations`** (new test class): Extracted the ad-hoc
`FakeS3` inner class into a reusable, standalone test double. Exposes
`storedObjects`, `committedObjects`, and `openMultipartUploads` as public
fields for direct inspection and corruption in tests. `getObject()` reads from
both maps; `deleteObject()` checks both maps; `commitMultiPartUpload()`
validates declared vs. assembled length.
- **`NativeS3RecoverableWriterRecoveryTest`** (new test class): Five unit
tests covering the fixed path and edge cases (see *Verifying this change*
below).
## Verifying this change
UT to showcase the bug and fix working
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / no) no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (yes / no) no
- The serializers: (yes / no / don't know) no
- The runtime per-record code paths (performance sensitive): (yes / no /
don't know) no
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
no
- The S3 file system connector: (yes / no / don't know) yes
## Documentation
- Does this pull request introduce a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs
/ not documented)
---
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change the checkbox below to `[X]` followed by the name of the tool, and
uncomment the
"Generated-by" line. See the ASF Generative Tooling Guidance for details:
https://www.apache.org/legal/generative-tooling.html
You are responsible for the quality and correctness of every change in this
PR
regardless of the tooling used. Low-effort AI-generated PRs will be closed.
See
AGENTS.md for the full guidance.
-->
- [ ] Yes (please specify the tool below)
<!--
Generated-by: [Tool Name and Version]
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]