This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 09aac02a55 fix: keep failed-upload cleanup inside the upload worker
(#5249)
09aac02a55 is described below
commit 09aac02a55d55c6ed026e1e372355aaaf33b63d3
Author: Matthew B. <[email protected]>
AuthorDate: Sat Jun 6 18:29:47 2026 -0700
fix: keep failed-upload cleanup inside the upload worker (#5249)
### What changes were proposed in this PR?
**What Caused the Issue:**
LargeBinaryOutputStream looked up the S3 client twice: once in the
upload worker (correct), and once again in close() during failed-upload
cleanup. When a test left a stream unclosed, Python's GC eventually
called __del__ → close() → the second lookup, but by then a different
test was active, so the cleanup hit the wrong test's mock_s3 and broke
its assert_called_once_with.
**Proposed Fix**
- Move `s3.delete_object(...)` from `_cleanup_failed_upload()` into the
upload worker, reusing the `s3` client already captured by the closure
that did the upload.
- Drop the `_cleanup_failed_upload()` method and the call to it from
`close()`; the worker now handles cleanup before recording the
exception.
- `close()` and `__del__` no longer call back into
`large_binary_manager`, so a finalizer firing under a later test's
monkey-patches cannot reach the wrong S3 client.
### Any related issues, documentation, or discussions?
Closes: #5245 Follow-up to #4707; surfaced on the 3.12 leg of
https://github.com/apache/texera/actions/runs/26481776334/job/77980417021.
### How was this PR tested?
- Ran `ruff format` and `ruff check` over `amber/src/main/python` and
`amber/src/test/python` (clean).
- Existing tests in `test_large_binary_output_stream.py` still cover the
relevant paths: `test_close_handles_upload_error`,
`test_delete_object_failure_is_swallowed`, and
`test_write_after_upload_error_raises_error`.
- Simplified `test_write_after_upload_error_raises_error` back to inline
form and removed the `_drained` helper, both no longer needed once
cleanup is structurally contained.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.7 in compliance with ASF
---------
Signed-off-by: Matthew B. <[email protected]>
---
.../pytexera/storage/large_binary_output_stream.py | 21 ++++++++++-----------
.../storage/test_large_binary_output_stream.py | 4 ++--
2 files changed, 12 insertions(+), 13 deletions(-)
diff --git
a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
index 0cdf8a3679..1ca575dd42 100644
--- a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
+++ b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
@@ -153,14 +153,24 @@ class LargeBinaryOutputStream(IOBase):
if self._upload_thread is None:
def upload_worker():
+ s3 = None
try:
large_binary_manager._ensure_bucket_exists(self._bucket_name)
s3 = large_binary_manager._get_s3_client()
reader = _QueueReader(self._queue)
s3.upload_fileobj(reader, self._bucket_name,
self._object_key)
except Exception as e:
+ # Record the failure first so the next write() call can
+ # immediately raise, then best-effort clean up the object.
with self._lock:
self._upload_exception = e
+ if s3 is not None:
+ try:
+ s3.delete_object(
+ Bucket=self._bucket_name, Key=self._object_key
+ )
+ except Exception:
+ pass
finally:
self._upload_complete.set()
@@ -214,12 +224,10 @@ class LargeBinaryOutputStream(IOBase):
self._upload_thread.join()
self._upload_complete.wait()
- # Check for errors and cleanup if needed
with self._lock:
exception = self._upload_exception
if exception is not None:
- self._cleanup_failed_upload()
raise IOError(
f"Failed to complete upload: {exception}"
) from exception
@@ -228,15 +236,6 @@ class LargeBinaryOutputStream(IOBase):
# the second close() call on Python 3.13+.
super().close()
- def _cleanup_failed_upload(self):
- """Clean up a failed upload by deleting the S3 object."""
- try:
- s3 = large_binary_manager._get_s3_client()
- s3.delete_object(Bucket=self._bucket_name, Key=self._object_key)
- except Exception:
- # Ignore cleanup errors - we're already handling an upload failure
- pass
-
def __enter__(self):
"""Context manager entry."""
return self
diff --git
a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
index 17725d9c66..e35f1400fd 100644
--- a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
+++ b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
@@ -248,8 +248,8 @@ class TestLargeBinaryOutputStream:
stream.close()
-class TestCleanupFailedUpload:
- """Direct unit tests for _cleanup_failed_upload's silent-swallow path."""
+class TestUploadWorkerCleanup:
+ """Direct unit tests for the upload worker's silent-swallow cleanup."""
@pytest.fixture
def large_binary(self):