This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5249-38f5ac5fb249e19b40438c0bf2d711f7fb5068f4 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 09aac02a55d55c6ed026e1e372355aaaf33b63d3 Author: Matthew B. <[email protected]> AuthorDate: Sat Jun 6 18:29:47 2026 -0700 fix: keep failed-upload cleanup inside the upload worker (#5249) ### What changes were proposed in this PR? **What Caused the Issue:** LargeBinaryOutputStream looked up the S3 client twice: once in the upload worker (correct), and once again in close() during failed-upload cleanup. When a test left a stream unclosed, Python's GC eventually called __del__ → close() → the second lookup, but by then a different test was active, so the cleanup hit the wrong test's mock_s3 and broke its assert_called_once_with. **Proposed Fix** - Move `s3.delete_object(...)` from `_cleanup_failed_upload()` into the upload worker, reusing the `s3` client already captured by the closure that did the upload. - Drop the `_cleanup_failed_upload()` method and the call to it from `close()`; the worker now handles cleanup before recording the exception. - `close()` and `__del__` no longer call back into `large_binary_manager`, so a finalizer firing under a later test's monkey-patches cannot reach the wrong S3 client. ### Any related issues, documentation, or discussions? Closes: #5245 Follow-up to #4707; surfaced on the 3.12 leg of https://github.com/apache/texera/actions/runs/26481776334/job/77980417021. ### How was this PR tested? - Ran `ruff format` and `ruff check` over `amber/src/main/python` and `amber/src/test/python` (clean). - Existing tests in `test_large_binary_output_stream.py` still cover the relevant paths: `test_close_handles_upload_error`, `test_delete_object_failure_is_swallowed`, and `test_write_after_upload_error_raises_error`. - Simplified `test_write_after_upload_error_raises_error` back to inline form and removed the `_drained` helper, both no longer needed once cleanup is structurally contained. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.7 in compliance with ASF --------- Signed-off-by: Matthew B. <[email protected]> --- .../pytexera/storage/large_binary_output_stream.py | 21 ++++++++++----------- .../storage/test_large_binary_output_stream.py | 4 ++-- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py index 0cdf8a3679..1ca575dd42 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py +++ b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py @@ -153,14 +153,24 @@ class LargeBinaryOutputStream(IOBase): if self._upload_thread is None: def upload_worker(): + s3 = None try: large_binary_manager._ensure_bucket_exists(self._bucket_name) s3 = large_binary_manager._get_s3_client() reader = _QueueReader(self._queue) s3.upload_fileobj(reader, self._bucket_name, self._object_key) except Exception as e: + # Record the failure first so the next write() call can + # immediately raise, then best-effort clean up the object. with self._lock: self._upload_exception = e + if s3 is not None: + try: + s3.delete_object( + Bucket=self._bucket_name, Key=self._object_key + ) + except Exception: + pass finally: self._upload_complete.set() @@ -214,12 +224,10 @@ class LargeBinaryOutputStream(IOBase): self._upload_thread.join() self._upload_complete.wait() - # Check for errors and cleanup if needed with self._lock: exception = self._upload_exception if exception is not None: - self._cleanup_failed_upload() raise IOError( f"Failed to complete upload: {exception}" ) from exception @@ -228,15 +236,6 @@ class LargeBinaryOutputStream(IOBase): # the second close() call on Python 3.13+. super().close() - def _cleanup_failed_upload(self): - """Clean up a failed upload by deleting the S3 object.""" - try: - s3 = large_binary_manager._get_s3_client() - s3.delete_object(Bucket=self._bucket_name, Key=self._object_key) - except Exception: - # Ignore cleanup errors - we're already handling an upload failure - pass - def __enter__(self): """Context manager entry.""" return self diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py index 17725d9c66..e35f1400fd 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py @@ -248,8 +248,8 @@ class TestLargeBinaryOutputStream: stream.close() -class TestCleanupFailedUpload: - """Direct unit tests for _cleanup_failed_upload's silent-swallow path.""" +class TestUploadWorkerCleanup: + """Direct unit tests for the upload worker's silent-swallow cleanup.""" @pytest.fixture def large_binary(self):
