NestDream opened a new pull request, #28360:
URL: https://github.com/apache/flink/pull/28360

   ## What is the purpose of the change
   
   `NativeS3RecoverableFsDataOutputStream` can permanently leak a local temp 
file when a part upload fails at commit time.
   
   `closeForCommit()` sets `closed = true` and *then* calls 
`uploadCurrentPart()` to flush the final pending part. If `uploadPart()` throws 
there (for example an S3 5xx that survives the SDK's internal retries), the 
upload's only cleanup is skipped, and the stream's later `close()` 
short-circuits on its `if (!closed)` guard and never runs. The single pending 
part's `s3-part-<uuid>` file is then orphaned in the shared `io.tmp.dirs` (one 
leaked file per affected stream) until the TaskManager is recycled. Under 
repeated commit failures these accumulate across the lifetime of the process.
   
   The same per-part cleanup is also missing on the `write()` path, where a 
large part is flushed mid-stream. That one is not a permanent leak — a later 
`close()` still reclaims the file — so it's a robustness gap rather than a 
leak, but it's the same root cause and is fixed the same way.
   
   While in here, three `Files.delete()` calls are replaced with 
`Files.deleteIfExists()`. One of them, in `close()`, is a real (if minor) 
intra-class TOCTOU: `write()` / `uploadCurrentPart()` run without the lock and 
can delete `currentTempFile`, so a concurrent cancellation `close()` could fall 
through its `exists()` check and then throw `NoSuchFileException` on the 
delete. The other two are hardening.
   
   ## Brief change log
   
     - Wrap the upload in `uploadCurrentPart()` in a `try`/`finally` so the 
temp file is always deleted, even when `uploadPart()` throws; a failed delete 
is caught and logged via `LOG.warn` so it can't mask the original upload 
`IOException`.
     - Replace the three `Files.delete()` calls (in `uploadCurrentPart()`, the 
`closeForCommit()` else-branch, and `close()`) with `Files.deleteIfExists()`, 
and drop the now-redundant `exists()` guard in `close()`.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - `closeForCommitUploadFailureDeletesTempFile` — drives the permanent-leak 
path. A small write (1024 bytes, under the 5 MB min part size) does not flush 
during `write()`, so the only `uploadPart()` happens at commit time; that 
upload is made to throw, and the test asserts no `s3-part-*` file is left 
behind. Fails without the fix.
     - `uploadPartFailureFromWriteDeletesTempFile` — drives the `write()`-path 
flush (a full 5 MB part) with a failing upload and asserts the temp file is 
cleaned up. Fails without the fix.
     - `closeForCommitIsIdempotentWhenTempFileMissing` — exercises the 
`closeForCommit()` else-branch (no pending bytes) with the temp file already 
gone. Throws `NoSuchFileException` without the fix; passes with 
`deleteIfExists`.
     - `closeForCommitSuccessDeletesTempFile` — happy-path control: a 
successful commit deletes the temp file. Passes with and without the fix.
   
   With the fix, all 4 pass. Without the fix (source reverted to pristine 
`master`, tests kept), the 3 above fail (2 leak assertions + 1 
`NoSuchFileException`) and the happy-path control passes. Also manually 
verified the happy path against real S3 (13 MB → genuine 3-part multipart 
upload, object committed, no local `s3-part-*` files left, no orphan multipart 
uploads).
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: yes
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [X] Yes. Claude was used as a coding assistant; I reviewed and verified 
all changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to