NestDream opened a new pull request, #28360:
URL: https://github.com/apache/flink/pull/28360
## What is the purpose of the change
`NativeS3RecoverableFsDataOutputStream` can permanently leak a local temp
file when a part upload fails at commit time.
`closeForCommit()` sets `closed = true` and *then* calls
`uploadCurrentPart()` to flush the final pending part. If `uploadPart()` throws
there (for example an S3 5xx that survives the SDK's internal retries), the
upload's only cleanup is skipped, and the stream's later `close()`
short-circuits on its `if (!closed)` guard and never runs. The single pending
part's `s3-part-<uuid>` file is then orphaned in the shared `io.tmp.dirs` (one
leaked file per affected stream) until the TaskManager is recycled. Under
repeated commit failures these accumulate across the lifetime of the process.
The same per-part cleanup is also missing on the `write()` path, where a
large part is flushed mid-stream. That one is not a permanent leak — a later
`close()` still reclaims the file — so it's a robustness gap rather than a
leak, but it's the same root cause and is fixed the same way.
While in here, three `Files.delete()` calls are replaced with
`Files.deleteIfExists()`. One of them, in `close()`, is a real (if minor)
intra-class TOCTOU: `write()` / `uploadCurrentPart()` run without the lock and
can delete `currentTempFile`, so a concurrent cancellation `close()` could fall
through its `exists()` check and then throw `NoSuchFileException` on the
delete. The other two are hardening.
## Brief change log
- Wrap the upload in `uploadCurrentPart()` in a `try`/`finally` so the
temp file is always deleted, even when `uploadPart()` throws; a failed delete
is caught and logged via `LOG.warn` so it can't mask the original upload
`IOException`.
- Replace the three `Files.delete()` calls (in `uploadCurrentPart()`, the
`closeForCommit()` else-branch, and `close()`) with `Files.deleteIfExists()`,
and drop the now-redundant `exists()` guard in `close()`.
## Verifying this change
This change added tests and can be verified as follows:
- `closeForCommitUploadFailureDeletesTempFile` — drives the permanent-leak
path. A small write (1024 bytes, under the 5 MB min part size) does not flush
during `write()`, so the only `uploadPart()` happens at commit time; that
upload is made to throw, and the test asserts no `s3-part-*` file is left
behind. Fails without the fix.
- `uploadPartFailureFromWriteDeletesTempFile` — drives the `write()`-path
flush (a full 5 MB part) with a failing upload and asserts the temp file is
cleaned up. Fails without the fix.
- `closeForCommitIsIdempotentWhenTempFileMissing` — exercises the
`closeForCommit()` else-branch (no pending bytes) with the temp file already
gone. Throws `NoSuchFileException` without the fix; passes with
`deleteIfExists`.
- `closeForCommitSuccessDeletesTempFile` — happy-path control: a
successful commit deletes the temp file. Passes with and without the fix.
With the fix, all 4 pass. Without the fix (source reverted to pristine
`master`, tests kept), the 3 above fail (2 leak assertions + 1
`NoSuchFileException`) and the happy-path control passes. Also manually
verified the happy path against real S3 (13 MB → genuine 3-part multipart
upload, object committed, no local `s3-part-*` files left, no orphan multipart
uploads).
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: yes
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
---
##### Was generative AI tooling used to co-author this PR?
- [X] Yes. Claude was used as a coding assistant; I reviewed and verified
all changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]