This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 09aac02a55 fix: keep failed-upload cleanup inside the upload worker 
(#5249)
09aac02a55 is described below

commit 09aac02a55d55c6ed026e1e372355aaaf33b63d3
Author: Matthew B. <[email protected]>
AuthorDate: Sat Jun 6 18:29:47 2026 -0700

    fix: keep failed-upload cleanup inside the upload worker (#5249)
    
    ### What changes were proposed in this PR?
    **What Caused the Issue:**
    LargeBinaryOutputStream looked up the S3 client twice: once in the
    upload worker (correct), and once again in close() during failed-upload
    cleanup. When a test left a stream unclosed, Python's GC eventually
    called __del__ → close() → the second lookup, but by then a different
    test was active, so the cleanup hit the wrong test's mock_s3 and broke
    its assert_called_once_with.
    
    **Proposed Fix**
    - Move `s3.delete_object(...)` from `_cleanup_failed_upload()` into the
    upload worker, reusing the `s3` client already captured by the closure
    that did the upload.
    - Drop the `_cleanup_failed_upload()` method and the call to it from
    `close()`; the worker now handles cleanup before recording the
    exception.
    - `close()` and `__del__` no longer call back into
    `large_binary_manager`, so a finalizer firing under a later test's
    monkey-patches cannot reach the wrong S3 client.
    ### Any related issues, documentation, or discussions?
    Closes: #5245 Follow-up to #4707; surfaced on the 3.12 leg of
    https://github.com/apache/texera/actions/runs/26481776334/job/77980417021.
    ### How was this PR tested?
    - Ran `ruff format` and `ruff check` over `amber/src/main/python` and
    `amber/src/test/python` (clean).
    - Existing tests in `test_large_binary_output_stream.py` still cover the
    relevant paths: `test_close_handles_upload_error`,
    `test_delete_object_failure_is_swallowed`, and
    `test_write_after_upload_error_raises_error`.
    - Simplified `test_write_after_upload_error_raises_error` back to inline
    form and removed the `_drained` helper, both no longer needed once
    cleanup is structurally contained.
    ### Was this PR authored or co-authored using generative AI tooling?
    Co-authored with Claude Opus 4.7 in compliance with ASF
    
    ---------
    
    Signed-off-by: Matthew B. <[email protected]>
---
 .../pytexera/storage/large_binary_output_stream.py  | 21 ++++++++++-----------
 .../storage/test_large_binary_output_stream.py      |  4 ++--
 2 files changed, 12 insertions(+), 13 deletions(-)

diff --git 
a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py 
b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
index 0cdf8a3679..1ca575dd42 100644
--- a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
+++ b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
@@ -153,14 +153,24 @@ class LargeBinaryOutputStream(IOBase):
         if self._upload_thread is None:
 
             def upload_worker():
+                s3 = None
                 try:
                     
large_binary_manager._ensure_bucket_exists(self._bucket_name)
                     s3 = large_binary_manager._get_s3_client()
                     reader = _QueueReader(self._queue)
                     s3.upload_fileobj(reader, self._bucket_name, 
self._object_key)
                 except Exception as e:
+                    # Record the failure first so the next write() call can
+                    # immediately raise, then best-effort clean up the object.
                     with self._lock:
                         self._upload_exception = e
+                    if s3 is not None:
+                        try:
+                            s3.delete_object(
+                                Bucket=self._bucket_name, Key=self._object_key
+                            )
+                        except Exception:
+                            pass
                 finally:
                     self._upload_complete.set()
 
@@ -214,12 +224,10 @@ class LargeBinaryOutputStream(IOBase):
                 self._upload_thread.join()
                 self._upload_complete.wait()
 
-                # Check for errors and cleanup if needed
                 with self._lock:
                     exception = self._upload_exception
 
                 if exception is not None:
-                    self._cleanup_failed_upload()
                     raise IOError(
                         f"Failed to complete upload: {exception}"
                     ) from exception
@@ -228,15 +236,6 @@ class LargeBinaryOutputStream(IOBase):
             # the second close() call on Python 3.13+.
             super().close()
 
-    def _cleanup_failed_upload(self):
-        """Clean up a failed upload by deleting the S3 object."""
-        try:
-            s3 = large_binary_manager._get_s3_client()
-            s3.delete_object(Bucket=self._bucket_name, Key=self._object_key)
-        except Exception:
-            # Ignore cleanup errors - we're already handling an upload failure
-            pass
-
     def __enter__(self):
         """Context manager entry."""
         return self
diff --git 
a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py 
b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
index 17725d9c66..e35f1400fd 100644
--- a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
+++ b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
@@ -248,8 +248,8 @@ class TestLargeBinaryOutputStream:
             stream.close()
 
 
-class TestCleanupFailedUpload:
-    """Direct unit tests for _cleanup_failed_upload's silent-swallow path."""
+class TestUploadWorkerCleanup:
+    """Direct unit tests for the upload worker's silent-swallow cleanup."""
 
     @pytest.fixture
     def large_binary(self):

Reply via email to