This is an automated email from the ASF dual-hosted git repository.

xuang7 pushed a commit to branch release/v1.2
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/release/v1.2 by this push:
     new 01eea9d9d1 fix: scope large binary storage and cleanup by execution id 
(#5280) [release/v1.2 backport] (#5602)
01eea9d9d1 is described below

commit 01eea9d9d1b7a36d99719f0fac05c92fb14bb30e
Author: Kunwoo (Chris) <[email protected]>
AuthorDate: Wed Jun 10 11:01:14 2026 -0700

    fix: scope large binary storage and cleanup by execution id (#5280) 
[release/v1.2 backport] (#5602)
    
    ### What changes were proposed in this PR?
    
    Backport of #5280 ("scope large binary storage and cleanup by execution
    id") to `release/v1.2`.
    
    As noted in #5569, #5280 cannot be backported as a single cherry-pick:
    its changes to `large_binary_output_stream.py` build on top of #5249
    ("keep failed-upload cleanup inside the upload worker"), which is not
    present on `release/v1.2`. Cherry-picking #5280 alone leaves a dangling
    reference to the `large_binary_manager` module in
    `_cleanup_failed_upload()` (a method #5249 removes), producing a
    `NameError` at runtime.
    
    This PR therefore backports the dependency chain, both as clean `git
    cherry-pick -x` of the original squash commits:
    
    1. **#5249** — keep failed-upload cleanup inside the upload worker
    (prerequisite)
    2. **#5280** — scope large binary storage and cleanup by execution id
    
    `#4707` (which #5249 follows up on) is already present on
    `release/v1.2`, so the chain stops there. After applying both commits,
    every file touched by #5280 is byte-identical to its state on `main` at
    the merged #5280 commit (`48e800e4`), and `release/v1.2` had no
    independent changes to any of those files.
    
    ### Any related issues, documentation, discussions?
    
    Backports #5280 (which closes #4123 on `main`). Prerequisite: #5249.
    Unblocks the #5569 backport, which can then cherry-pick cleanly onto
    `release/v1.2`.
    
    ### How was this PR tested?
    
    This is a backport with no changes beyond the two cherry-picked commits,
    so it relies on the existing tests carried over from #5249 and #5280
    (`LargeBinaryManagerSpec`, `LargeBinaryManagerUnitSpec`,
    `test_large_binary_output_stream.py`, `test_large_binary_manager.py`,
    etc.).
    
    Backport fidelity was verified locally by confirming that, after the
    chain is applied onto `release/v1.2`:
    - all files touched by #5280 are byte-identical to `main@48e800e4`;
    - no callers of the removed `deleteAllObjects()` remain;
    - `deleteByExecution` is wired in `WorkflowResource` and
    `WorkflowService`.
    
    Full compile and unit-test runs are left to CI.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.8)
    
    ---------
    
    Signed-off-by: Matthew B. <[email protected]>
    Signed-off-by: Kunwoo (Chris) <[email protected]>
    Co-authored-by: Matthew B. <[email protected]>
    Co-authored-by: Xiaozhen Liu <[email protected]>
---
 .../main/python/core/models/type/large_binary.py   |   4 +-
 .../src/main/python/core/storage/storage_config.py |   5 +
 .../pytexera/storage/large_binary_input_stream.py  |   4 +-
 .../pytexera/storage/large_binary_manager.py       | 110 ++++++++++-------
 .../pytexera/storage/large_binary_output_stream.py |  28 ++---
 amber/src/main/python/texera_run_python_worker.py  |   2 +
 .../pythonworker/PythonWorkflowWorker.scala        |   3 +-
 .../scheduling/config/WorkerConfig.scala           |   9 +-
 .../engine/architecture/worker/DPThread.scala      |  10 +-
 .../architecture/worker/WorkflowWorker.scala       |   8 +-
 .../dashboard/user/workflow/WorkflowResource.scala |   6 +-
 .../texera/web/service/WorkflowService.scala       |   6 +-
 .../packaging/test_state_materialization_e2e.py    |   1 +
 .../python/core/models/type/test_large_binary.py   |   6 +-
 .../core/storage/iceberg/test_iceberg_document.py  |   1 +
 .../storage/test_large_binary_input_stream.py      |   5 +-
 .../pytexera/storage/test_large_binary_manager.py  | 133 ++++++++++++---------
 .../storage/test_large_binary_output_stream.py     |   9 +-
 .../scheduling/config/SchedulingConfigsSpec.scala  |  19 ++-
 .../engine/architecture/worker/DPThreadSpec.scala  |  32 ++++-
 common/config/src/main/resources/storage.conf      |   4 +
 .../apache/texera/amber/config/StorageConfig.scala |   1 +
 .../texera/service/util/LargeBinaryManager.scala   |  86 +++++++++----
 .../service/util/LargeBinaryManagerSpec.scala      | 115 +++++++++++++-----
 .../service/util/LargeBinaryManagerUnitSpec.scala  |  72 +++++++++++
 25 files changed, 494 insertions(+), 185 deletions(-)

diff --git a/amber/src/main/python/core/models/type/large_binary.py 
b/amber/src/main/python/core/models/type/large_binary.py
index 581a688912..34110f374e 100644
--- a/amber/src/main/python/core/models/type/large_binary.py
+++ b/amber/src/main/python/core/models/type/large_binary.py
@@ -63,9 +63,9 @@ class largebinary:
         """
         if uri is None:
             # Lazy import to avoid circular dependencies
-            from pytexera.storage import large_binary_manager
+            from pytexera.storage.large_binary_manager import 
LargeBinaryManager
 
-            uri = large_binary_manager.create()
+            uri = LargeBinaryManager().create()
 
         if not uri.startswith("s3://"):
             raise ValueError(f"largebinary URI must start with 's3://', got: 
{uri}")
diff --git a/amber/src/main/python/core/storage/storage_config.py 
b/amber/src/main/python/core/storage/storage_config.py
index 8233590987..05e8c3ee16 100644
--- a/amber/src/main/python/core/storage/storage_config.py
+++ b/amber/src/main/python/core/storage/storage_config.py
@@ -41,6 +41,9 @@ class StorageConfig:
     S3_REGION = None
     S3_AUTH_USERNAME = None
     S3_AUTH_PASSWORD = None
+    # Execution-scoped base URI (s3://bucket/objects/{eid}/) for this worker's 
large
+    # binaries; fixed at process init, which assumes one process per execution.
+    S3_LARGE_BINARIES_BASE_URI = None
 
     @classmethod
     def initialize(
@@ -59,6 +62,7 @@ class StorageConfig:
         s3_region,
         s3_auth_username,
         s3_auth_password,
+        s3_large_binaries_base_uri,
     ):
         if cls._initialized:
             raise RuntimeError(
@@ -82,6 +86,7 @@ class StorageConfig:
         cls.S3_REGION = s3_region
         cls.S3_AUTH_USERNAME = s3_auth_username
         cls.S3_AUTH_PASSWORD = s3_auth_password
+        cls.S3_LARGE_BINARIES_BASE_URI = s3_large_binaries_base_uri
 
         cls._initialized = True
 
diff --git 
a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py 
b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
index 68368c5c12..f17373c449 100644
--- a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
+++ b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
@@ -60,9 +60,9 @@ class LargeBinaryInputStream(IOBase):
 
     def _lazy_init(self):
         """Download from S3 on first read operation."""
-        from pytexera.storage import large_binary_manager
+        from pytexera.storage.large_binary_manager import LargeBinaryManager
 
-        s3 = large_binary_manager._get_s3_client()
+        s3 = LargeBinaryManager()._get_s3_client()
         response = s3.get_object(
             Bucket=self._large_binary.get_bucket_name(),
             Key=self._large_binary.get_object_key(),
diff --git a/amber/src/main/python/pytexera/storage/large_binary_manager.py 
b/amber/src/main/python/pytexera/storage/large_binary_manager.py
index e061eac622..b323c676d5 100644
--- a/amber/src/main/python/pytexera/storage/large_binary_manager.py
+++ b/amber/src/main/python/pytexera/storage/large_binary_manager.py
@@ -22,57 +22,83 @@ Users should not interact with this module directly. Use 
largebinary() construct
 and LargeBinaryInputStream/LargeBinaryOutputStream instead.
 """
 
-import time
+import threading
 import uuid
 from loguru import logger
 from core.storage.storage_config import StorageConfig
 
-# Module-level state
-_s3_client = None
-DEFAULT_BUCKET = "texera-large-binaries"
 
+class LargeBinaryManager:
+    """Manages large binaries in S3 for a worker process.
 
-def _get_s3_client():
-    """Get or initialize S3 client (lazy initialization, cached)."""
-    global _s3_client
-    if _s3_client is None:
-        try:
-            import boto3
-            from botocore.config import Config
-        except ImportError as e:
-            raise RuntimeError("boto3 required. Install with: pip install 
boto3") from e
+    A singleton, so the cached S3 client is shared process-wide. create() 
appends a
+    unique suffix to an execution-scoped base URI handed down by the 
controller as
+    process config (``StorageConfig.S3_LARGE_BINARIES_BASE_URI``); the worker 
never
+    holds an execution id. This is the Python counterpart of the JVM
+    ``LargeBinaryManager``, which uses a thread-local instead because one JVM 
process
+    runs many workers across executions (a Python worker is one process per 
execution).
+    """
 
-        _s3_client = boto3.client(
-            "s3",
-            endpoint_url=StorageConfig.S3_ENDPOINT,
-            aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
-            aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
-            region_name=StorageConfig.S3_REGION,
-            config=Config(signature_version="s3v4", s3={"addressing_style": 
"path"}),
-        )
-    return _s3_client
+    _instance = None
+    # Guards singleton creation and S3-client init; reached from the operator 
and upload
+    # threads.
+    _lock = threading.Lock()
 
+    def __new__(cls):
+        # Double-checked locking: skip the lock once the instance exists.
+        if cls._instance is None:
+            with cls._lock:
+                if cls._instance is None:
+                    instance = super().__new__(cls)
+                    instance._s3_client = None
+                    cls._instance = instance
+        return cls._instance
 
-def _ensure_bucket_exists(bucket: str):
-    """Ensure S3 bucket exists, creating it if necessary."""
-    s3 = _get_s3_client()
-    try:
-        s3.head_bucket(Bucket=bucket)
-    except s3.exceptions.NoSuchBucket:
-        logger.debug(f"Bucket {bucket} not found, creating it")
-        s3.create_bucket(Bucket=bucket)
-        logger.info(f"Created bucket: {bucket}")
+    def _get_s3_client(self):
+        """Get or initialize the S3 client (lazy initialization, cached)."""
+        if self._s3_client is None:
+            with self._lock:
+                if self._s3_client is None:
+                    try:
+                        import boto3
+                        from botocore.config import Config
+                    except ImportError as e:
+                        raise RuntimeError(
+                            "boto3 required. Install with: pip install boto3"
+                        ) from e
 
+                    self._s3_client = boto3.client(
+                        "s3",
+                        endpoint_url=StorageConfig.S3_ENDPOINT,
+                        aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
+                        aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
+                        region_name=StorageConfig.S3_REGION,
+                        config=Config(
+                            signature_version="s3v4", s3={"addressing_style": 
"path"}
+                        ),
+                    )
+        return self._s3_client
 
-def create() -> str:
-    """
-    Creates a new largebinary reference with a unique S3 URI.
+    def _ensure_bucket_exists(self, bucket: str):
+        """Ensure the S3 bucket exists, creating it if necessary."""
+        s3 = self._get_s3_client()
+        try:
+            s3.head_bucket(Bucket=bucket)
+        except s3.exceptions.NoSuchBucket:
+            logger.debug(f"Bucket {bucket} not found, creating it")
+            s3.create_bucket(Bucket=bucket)
+            logger.info(f"Created bucket: {bucket}")
 
-    Returns:
-        S3 URI string (format: s3://bucket/key)
-    """
-    _ensure_bucket_exists(DEFAULT_BUCKET)
-    timestamp_ms = int(time.time() * 1000)
-    unique_id = uuid.uuid4()
-    object_key = f"objects/{timestamp_ms}/{unique_id}"
-    return f"s3://{DEFAULT_BUCKET}/{object_key}"
+    def create(self) -> str:
+        """Append a unique suffix to the controller-provided base URI.
+
+        Pure string construction (no S3 round-trip); the bucket is created on 
demand at
+        upload time. Returns e.g. 
``s3://bucket/objects/{execution_id}/{uuid}``.
+        """
+        base_uri = StorageConfig.S3_LARGE_BINARIES_BASE_URI
+        if not base_uri:
+            raise RuntimeError(
+                "largebinary() requires a large-binaries base URI, but none is 
"
+                "configured (StorageConfig.S3_LARGE_BINARIES_BASE_URI is 
unset)."
+            )
+        return f"{base_uri}{uuid.uuid4()}"
diff --git 
a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py 
b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
index 0cdf8a3679..63d4caeb6b 100644
--- a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
+++ b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
@@ -29,7 +29,7 @@ Usage:
 from typing import Optional, Union
 from io import IOBase
 from core.models.type.large_binary import largebinary
-from pytexera.storage import large_binary_manager
+from pytexera.storage.large_binary_manager import LargeBinaryManager
 import threading
 import queue
 
@@ -153,14 +153,25 @@ class LargeBinaryOutputStream(IOBase):
         if self._upload_thread is None:
 
             def upload_worker():
+                s3 = None
                 try:
-                    
large_binary_manager._ensure_bucket_exists(self._bucket_name)
-                    s3 = large_binary_manager._get_s3_client()
+                    manager = LargeBinaryManager()
+                    manager._ensure_bucket_exists(self._bucket_name)
+                    s3 = manager._get_s3_client()
                     reader = _QueueReader(self._queue)
                     s3.upload_fileobj(reader, self._bucket_name, 
self._object_key)
                 except Exception as e:
+                    # Record the failure first so the next write() call can
+                    # immediately raise, then best-effort clean up the object.
                     with self._lock:
                         self._upload_exception = e
+                    if s3 is not None:
+                        try:
+                            s3.delete_object(
+                                Bucket=self._bucket_name, Key=self._object_key
+                            )
+                        except Exception:
+                            pass
                 finally:
                     self._upload_complete.set()
 
@@ -214,12 +225,10 @@ class LargeBinaryOutputStream(IOBase):
                 self._upload_thread.join()
                 self._upload_complete.wait()
 
-                # Check for errors and cleanup if needed
                 with self._lock:
                     exception = self._upload_exception
 
                 if exception is not None:
-                    self._cleanup_failed_upload()
                     raise IOError(
                         f"Failed to complete upload: {exception}"
                     ) from exception
@@ -228,15 +237,6 @@ class LargeBinaryOutputStream(IOBase):
             # the second close() call on Python 3.13+.
             super().close()
 
-    def _cleanup_failed_upload(self):
-        """Clean up a failed upload by deleting the S3 object."""
-        try:
-            s3 = large_binary_manager._get_s3_client()
-            s3.delete_object(Bucket=self._bucket_name, Key=self._object_key)
-        except Exception:
-            # Ignore cleanup errors - we're already handling an upload failure
-            pass
-
     def __enter__(self):
         """Context manager entry."""
         return self
diff --git a/amber/src/main/python/texera_run_python_worker.py 
b/amber/src/main/python/texera_run_python_worker.py
index f1ba8c7db6..5c3e25e096 100644
--- a/amber/src/main/python/texera_run_python_worker.py
+++ b/amber/src/main/python/texera_run_python_worker.py
@@ -70,6 +70,7 @@ if __name__ == "__main__":
         s3_region,
         s3_auth_username,
         s3_auth_password,
+        s3_large_binaries_base_uri,
     ) = sys.argv
     init_loguru_logger(logger_level)
     StorageConfig.initialize(
@@ -87,6 +88,7 @@ if __name__ == "__main__":
         s3_region,
         s3_auth_username,
         s3_auth_password,
+        s3_large_binaries_base_uri,
     )
 
     # Setting R_HOME environment variable for R-UDF usage
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
index b8bb3a92b6..c7ed3b6324 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
@@ -209,7 +209,8 @@ class PythonWorkflowWorker(
         StorageConfig.s3Endpoint,
         StorageConfig.s3Region,
         StorageConfig.s3Username,
-        StorageConfig.s3Password
+        StorageConfig.s3Password,
+        workerConfig.largeBinaryBaseUri
       )
     ).run(BasicIO.standard(false))
   }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala
index bb8f3d5775..8166dee6dd 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala
@@ -23,6 +23,7 @@ import org.apache.texera.amber.config.ApplicationConfig
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
 import org.apache.texera.amber.core.workflow.PhysicalOp
 import org.apache.texera.amber.util.VirtualIdentityUtils
+import org.apache.texera.service.util.LargeBinaryManager
 
 case object WorkerConfig {
   def generateWorkerConfigs(
@@ -45,7 +46,8 @@ case object WorkerConfig {
       WorkerConfig(
         VirtualIdentityUtils.createWorkerIdentity(physicalOp.workflowId, 
physicalOp.id, idx),
         pveName = physicalOp.pveName,
-        cuid = cuid
+        cuid = cuid,
+        largeBinaryBaseUri = 
LargeBinaryManager.baseUriForExecution(physicalOp.executionId.id)
       )
     )
   }
@@ -54,5 +56,8 @@ case object WorkerConfig {
 case class WorkerConfig(
     workerId: ActorVirtualIdentity,
     pveName: String = "",
-    cuid: Option[Int] = None
+    cuid: Option[Int] = None,
+    // Controller-named, execution-scoped base URI under which this worker's 
large binaries
+    // live; create() appends a unique suffix. Empty when large binaries are 
unconfigured.
+    largeBinaryBaseUri: String = ""
 )
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala
index 0c136d613a..fd4d7786c8 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala
@@ -39,6 +39,7 @@ import org.apache.texera.amber.engine.common.ambermessage.{
 }
 import org.apache.texera.amber.engine.common.virtualidentity.util.SELF
 import org.apache.texera.amber.error.ErrorUtils.safely
+import org.apache.texera.service.util.LargeBinaryManager
 
 import java.util.concurrent._
 
@@ -46,7 +47,10 @@ class DPThread(
     val actorId: ActorVirtualIdentity,
     dp: DataProcessor,
     logManager: ReplayLogManager,
-    internalQueue: LinkedBlockingQueue[DPInputQueueElement]
+    internalQueue: LinkedBlockingQueue[DPInputQueueElement],
+    // Controller-named, execution-scoped base URI for large binaries created 
on this
+    // thread (empty when unconfigured). Seeded into LargeBinaryManager at 
thread start.
+    largeBinaryBaseUri: String = ""
 ) extends AmberLogging {
 
   // initialize dp thread upon construction
@@ -91,6 +95,10 @@ class DPThread(
       dpThread = dpThreadExecutor.submit(new Runnable() {
         def run(): Unit = {
           Thread.currentThread().setName(getThreadName)
+          // Seed this thread's large-binary base URI (from WorkerConfig) 
before any tuple,
+          // so create() can append a suffix without the execution id. Once 
per thread,
+          // assuming a thread serves one execution.
+          LargeBinaryManager.setCurrentBaseUri(largeBinaryBaseUri)
           logger.info("DP thread started")
           startFuture.complete(())
           dp.statisticsManager.initializeWorkerStartTime(System.nanoTime())
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala
index d1a0a300d9..13b1d94b04 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala
@@ -107,7 +107,13 @@ class WorkflowWorker(
       )
     }
     // dp is ready
-    dpThread = new DPThread(workerConfig.workerId, dp, logManager, inputQueue)
+    dpThread = new DPThread(
+      workerConfig.workerId,
+      dp,
+      logManager,
+      inputQueue,
+      workerConfig.largeBinaryBaseUri
+    )
     dpThread.start()
   }
 
diff --git 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
index cb910d11c3..ee559048c9 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
@@ -611,8 +611,6 @@ class WorkflowResource extends LazyLogging {
         .asScala
         .toList
 
-      LargeBinaryManager.deleteAllObjects()
-
       // Collect all URIs related to executions for cleanup
       val uris = eids.flatMap { eid =>
         val executionId = ExecutionIdentity(eid.longValue())
@@ -638,6 +636,10 @@ class WorkflowResource extends LazyLogging {
         }
       }
 
+      // Delete large binaries for each execution belonging to the workflows 
being
+      // removed. Done after the transaction (like the document cleanup below).
+      eids.foreach(eid => 
LargeBinaryManager.deleteByExecution(eid.longValue()))
+
       // Clean up document storage
       try {
         uris.foreach { uri =>
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
index 0044c95967..c18b8b50e8 100644
--- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
+++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
@@ -318,7 +318,7 @@ class WorkflowService(
     *  2. Clears URI references from the execution registry
     *  3. Safely clears all result and console message documents
     *  4. Expires Iceberg snapshots for runtime statistics
-    *  5. Deletes large binaries from MinIO
+    *  5. Deletes this execution's large binaries from MinIO
     *
     * @param eid The execution identity to clean up resources for
     */
@@ -355,7 +355,7 @@ class WorkflowService(
           logger.debug(s"Error processing document at $uri: 
${error.getMessage}")
       }
     }
-    // Delete large binaries
-    LargeBinaryManager.deleteAllObjects()
+    // Delete this execution's large binaries
+    LargeBinaryManager.deleteByExecution(eid.id)
   }
 }
diff --git 
a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
 
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
index 9d1fd30698..cfc4f7f676 100644
--- 
a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
+++ 
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
@@ -108,6 +108,7 @@ def sqlite_iceberg_catalog():
             s3_region="unused",
             s3_auth_username="unused",
             s3_auth_password="unused",
+            s3_large_binaries_base_uri="s3://texera-large-binaries/objects/0/",
         )
 
     original_instance = IcebergCatalogInstance._instance
diff --git a/amber/src/test/python/core/models/type/test_large_binary.py 
b/amber/src/test/python/core/models/type/test_large_binary.py
index 36310e1dd5..56348f4265 100644
--- a/amber/src/test/python/core/models/type/test_large_binary.py
+++ b/amber/src/test/python/core/models/type/test_large_binary.py
@@ -18,6 +18,10 @@
 import pytest
 from unittest.mock import patch
 from core.models.type.large_binary import largebinary
+from pytexera.storage.large_binary_manager import LargeBinaryManager
+
+# The manager is a singleton; bind the shared instance for the tests.
+large_binary_manager = LargeBinaryManager()
 
 
 class TestLargeBinary:
@@ -31,7 +35,7 @@ class TestLargeBinary:
 
     def test_create_without_uri(self):
         """Test creating largebinary without URI (calls 
large_binary_manager.create)."""
-        with patch("pytexera.storage.large_binary_manager.create") as 
mock_create:
+        with patch.object(large_binary_manager, "create") as mock_create:
             mock_create.return_value = "s3://bucket/objects/123/uuid"
             large_binary = largebinary()
             assert large_binary.uri == "s3://bucket/objects/123/uuid"
diff --git 
a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py 
b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
index bdba00ba0c..5e61b4ed68 100644
--- a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
@@ -57,6 +57,7 @@ StorageConfig.initialize(
     s3_region="us-east-1",
     s3_auth_username="minioadmin",
     s3_auth_password="minioadmin",
+    s3_large_binaries_base_uri="s3://texera-large-binaries/objects/0/",
 )
 
 
diff --git 
a/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py 
b/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py
index 35bc5bc634..7e6a401aef 100644
--- a/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py
+++ b/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py
@@ -20,7 +20,10 @@ from unittest.mock import patch, MagicMock
 from io import BytesIO
 from core.models.type.large_binary import largebinary
 from pytexera.storage.large_binary_input_stream import LargeBinaryInputStream
-from pytexera.storage import large_binary_manager
+from pytexera.storage.large_binary_manager import LargeBinaryManager
+
+# The manager is a singleton; bind the shared instance for the tests.
+large_binary_manager = LargeBinaryManager()
 
 
 class TestLargeBinaryInputStream:
diff --git 
a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py 
b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
index 1942e91f8b..e1da5eee18 100644
--- a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
+++ b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
@@ -15,34 +15,44 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import re
+
 import pytest
 from unittest.mock import patch, MagicMock
-from pytexera.storage import large_binary_manager
+from pytexera.storage.large_binary_manager import LargeBinaryManager
 from core.storage.storage_config import StorageConfig
 
+# The manager is a singleton; bind the shared instance for the tests.
+large_binary_manager = LargeBinaryManager()
+
+# Execution-scoped base URI the controller hands down; create() appends a 
unique suffix.
+TEST_BASE_URI = "s3://texera-large-binaries/objects/1/"
+
+
[email protected](autouse=True)
+def _init_storage_config():
+    """Initialize StorageConfig (incl. the large-binaries base URI) for every 
test."""
+    if not StorageConfig._initialized:
+        StorageConfig.initialize(
+            catalog_type="postgres",
+            postgres_uri_without_scheme="localhost:5432/test",
+            postgres_username="test",
+            postgres_password="test",
+            rest_catalog_uri="http://localhost:8181/catalog/";,
+            rest_catalog_warehouse_name="texera",
+            table_result_namespace="test",
+            table_state_namespace="test-state",
+            directory_path="/tmp/test",
+            commit_batch_size=1000,
+            s3_endpoint="http://localhost:9000";,
+            s3_region="us-east-1",
+            s3_auth_username="minioadmin",
+            s3_auth_password="minioadmin",
+            s3_large_binaries_base_uri=TEST_BASE_URI,
+        )
 
-class TestLargeBinaryManager:
-    @pytest.fixture(autouse=True)
-    def setup_storage_config(self):
-        """Initialize StorageConfig for tests."""
-        if not StorageConfig._initialized:
-            StorageConfig.initialize(
-                catalog_type="postgres",
-                postgres_uri_without_scheme="localhost:5432/test",
-                postgres_username="test",
-                postgres_password="test",
-                rest_catalog_uri="http://localhost:8181/catalog/";,
-                rest_catalog_warehouse_name="texera",
-                table_result_namespace="test",
-                table_state_namespace="test-state",
-                directory_path="/tmp/test",
-                commit_batch_size=1000,
-                s3_endpoint="http://localhost:9000";,
-                s3_region="us-east-1",
-                s3_auth_username="minioadmin",
-                s3_auth_password="minioadmin",
-            )
 
+class TestLargeBinaryManager:
     def test_get_s3_client_initializes_once(self):
         """Test that S3 client is initialized and cached."""
         # Reset the client
@@ -118,37 +128,48 @@ class TestLargeBinaryManager:
             
mock_client.head_bucket.assert_called_once_with(Bucket="test-bucket")
             
mock_client.create_bucket.assert_called_once_with(Bucket="test-bucket")
 
-    def test_create_generates_unique_uri(self):
-        """Test that create() generates a unique S3 URI."""
-        large_binary_manager._s3_client = None
-
-        with patch("boto3.client") as mock_boto3_client:
-            mock_client = MagicMock()
-            mock_boto3_client.return_value = mock_client
-            mock_client.head_bucket.return_value = None
-            mock_client.exceptions.NoSuchBucket = type("NoSuchBucket", 
(Exception,), {})
-
-            uri = large_binary_manager.create()
-
-            # Check URI format
-            assert uri.startswith("s3://")
-            assert 
uri.startswith(f"s3://{large_binary_manager.DEFAULT_BUCKET}/")
-            assert "objects/" in uri
-
-            # Verify bucket was checked/created
-            mock_client.head_bucket.assert_called_once_with(
-                Bucket=large_binary_manager.DEFAULT_BUCKET
-            )
-
-    def test_create_uses_default_bucket(self):
-        """Test that create() uses the default bucket."""
-        large_binary_manager._s3_client = None
-
-        with patch("boto3.client") as mock_boto3_client:
-            mock_client = MagicMock()
-            mock_boto3_client.return_value = mock_client
-            mock_client.head_bucket.return_value = None
-            mock_client.exceptions.NoSuchBucket = type("NoSuchBucket", 
(Exception,), {})
-
-            uri = large_binary_manager.create()
-            assert large_binary_manager.DEFAULT_BUCKET in uri
+    def test_create_appends_unique_suffix_to_base_uri(self):
+        """create() returns the configured base URI plus a unique suffix (no 
S3 call)."""
+        base = StorageConfig.S3_LARGE_BINARIES_BASE_URI
+
+        uri1 = large_binary_manager.create()
+        uri2 = large_binary_manager.create()
+
+        assert uri1.startswith(base)
+        assert uri2.startswith(base)
+        # A non-empty, unique suffix follows the base URI.
+        assert uri1 != base
+        assert uri1 != uri2
+
+
+def test_create_matches_execution_scoped_key_shape(monkeypatch):
+    # The base URI is execution-scoped (controller-named); create() only 
appends a uuid.
+    monkeypatch.setattr(
+        StorageConfig,
+        "S3_LARGE_BINARIES_BASE_URI",
+        "s3://texera-large-binaries/objects/42/",
+    )
+    uri = large_binary_manager.create()
+    assert 
re.fullmatch(r"s3://texera-large-binaries/objects/42/[0-9a-fA-F-]+", uri)
+
+
+def test_create_without_base_uri_raises(monkeypatch):
+    # An unconfigured base URI should fail with a clear error, not a cryptic 
S3 one.
+    monkeypatch.setattr(StorageConfig, "S3_LARGE_BINARIES_BASE_URI", None)
+    with pytest.raises(RuntimeError):
+        large_binary_manager.create()
+
+
+def test_largebinarymanager_is_a_singleton():
+    # Constructing the manager always returns the same shared instance.
+    assert LargeBinaryManager() is LargeBinaryManager()
+
+    # State (the cached S3 client) is shared across handles (same instance).
+    mgr = LargeBinaryManager()
+    original = mgr._s3_client
+    sentinel = object()
+    mgr._s3_client = sentinel
+    try:
+        assert LargeBinaryManager()._s3_client is sentinel
+    finally:
+        mgr._s3_client = original
diff --git 
a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py 
b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
index 17725d9c66..dc8881ab16 100644
--- a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
+++ b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
@@ -24,7 +24,10 @@ from pytexera.storage.large_binary_output_stream import (
     LargeBinaryOutputStream,
     _QueueReader,
 )
-from pytexera.storage import large_binary_manager
+from pytexera.storage.large_binary_manager import LargeBinaryManager
+
+# The manager is a singleton; bind the shared instance for the tests.
+large_binary_manager = LargeBinaryManager()
 
 
 class TestLargeBinaryOutputStream:
@@ -248,8 +251,8 @@ class TestLargeBinaryOutputStream:
             stream.close()
 
 
-class TestCleanupFailedUpload:
-    """Direct unit tests for _cleanup_failed_upload's silent-swallow path."""
+class TestUploadWorkerCleanup:
+    """Direct unit tests for the upload worker's silent-swallow cleanup."""
 
     @pytest.fixture
     def large_binary(self):
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala
index 25ef739251..31ab5c8e1a 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala
@@ -31,6 +31,7 @@ import org.apache.texera.amber.core.virtualidentity.{
 }
 import org.apache.texera.amber.core.workflow._
 import 
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._
+import org.apache.texera.service.util.LargeBinaryManager
 import org.scalatest.flatspec.AnyFlatSpec
 
 import java.net.URI
@@ -273,11 +274,15 @@ class SchedulingConfigsSpec extends AnyFlatSpec {
   // WorkerConfig.generateWorkerConfigs
   // 
---------------------------------------------------------------------------
 
-  private def physicalOp(parallelizable: Boolean, suggested: Option[Int]): 
PhysicalOp =
+  private def physicalOp(
+      parallelizable: Boolean,
+      suggested: Option[Int],
+      executionId: ExecutionIdentity = ExecutionIdentity(0)
+  ): PhysicalOp =
     PhysicalOp(
       PhysicalOpIdentity(OperatorIdentity("op"), "main"),
       WorkflowIdentity(0),
-      ExecutionIdentity(0),
+      executionId,
       OpExecInitInfo.Empty,
       parallelizable = parallelizable,
       suggestedWorkerNum = suggested
@@ -308,4 +313,14 @@ class SchedulingConfigsSpec extends AnyFlatSpec {
       WorkerConfig.generateWorkerConfigs(physicalOp(parallelizable = true, 
suggested = None))
     assert(configs.size == ApplicationConfig.numWorkerPerOperatorByDefault)
   }
+
+  it should "set largeBinaryBaseUri to the execution-scoped base URI for every 
worker" in {
+    val eid = ExecutionIdentity(42L)
+    val configs = WorkerConfig.generateWorkerConfigs(
+      physicalOp(parallelizable = true, suggested = Some(3), executionId = eid)
+    )
+    val expected = LargeBinaryManager.baseUriForExecution(eid.id)
+    assert(expected.contains(s"objects/${eid.id}/"))
+    assert(configs.forall(_.largeBinaryBaseUri == expected))
+  }
 }
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala
index d8b5d57d63..eb1241182e 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala
@@ -42,11 +42,12 @@ import 
org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFI
 import 
org.apache.texera.amber.engine.common.rpc.AsyncRPCClient.ControlInvocation
 import org.apache.texera.amber.engine.common.storage.SequentialRecordStorage
 import org.apache.texera.amber.engine.common.virtualidentity.util.SELF
+import org.apache.texera.service.util.LargeBinaryManager
 import org.scalamock.scalatest.MockFactory
 import org.scalatest.flatspec.AnyFlatSpec
 
 import java.net.URI
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.{CompletableFuture, LinkedBlockingQueue, TimeUnit}
 
 class DPThreadSpec extends AnyFlatSpec with MockFactory {
 
@@ -240,4 +241,33 @@ class DPThreadSpec extends AnyFlatSpec with MockFactory {
     assert(logs.length > 1)
   }
 
+  "DP Thread" should "seed the base URI so create() yields execution-scoped 
keys" in {
+    val eid = 7777L
+    val baseUri = LargeBinaryManager.baseUriForExecution(eid)
+    // create() runs on the DP thread; capture what it produces there.
+    val capturedUri = new CompletableFuture[String]()
+    val inputQueue = new LinkedBlockingQueue[DPInputQueueElement]()
+    val dp = new DataProcessor(workerId, x => {}, inputMessageQueue = 
inputQueue)
+    dp.executor = new OperatorExecutor {
+      override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] 
= {
+        capturedUri.complete(LargeBinaryManager.create())
+        Iterator.empty
+      }
+    }
+    dp.inputManager.addPort(mockInputPortId, schema, List.empty, List.empty)
+    dp.inputGateway.getChannel(dataChannelId).setPortId(mockInputPortId)
+    dp.adaptiveBatchingMonitor = mock[WorkerTimerService]
+    (dp.adaptiveBatchingMonitor.resumeAdaptiveBatching 
_).expects().anyNumberOfTimes()
+    val dpThread = new DPThread(workerId, dp, logManager, inputQueue, baseUri)
+    dpThread.start()
+    inputQueue.put(
+      FIFOMessageElement(WorkflowFIFOMessage(dataChannelId, 0, 
DataFrame(Array(tuples(0)))))
+    )
+
+    val uri = capturedUri.get(5, TimeUnit.SECONDS)
+    
assert(uri.startsWith(s"s3://${LargeBinaryManager.DEFAULT_BUCKET}/objects/$eid/"))
+    // a unique suffix is appended to the execution-scoped base URI
+    assert(uri.length > baseUri.length)
+  }
+
 }
diff --git a/common/config/src/main/resources/storage.conf 
b/common/config/src/main/resources/storage.conf
index 0682109f19..12a9919e04 100644
--- a/common/config/src/main/resources/storage.conf
+++ b/common/config/src/main/resources/storage.conf
@@ -115,6 +115,10 @@ storage {
         region = "us-west-2"
         region = ${?STORAGE_S3_REGION}
 
+        # Bucket for large binaries; shared by the JVM and Python workers and 
cleanup.
+        large-binaries-bucket = "texera-large-binaries"
+        large-binaries-bucket = ${?STORAGE_S3_LARGE_BINARIES_BUCKET}
+
         multipart {
             part-size = "16MB"
             part-size = ${?STORAGE_S3_MULTIPART_PART_SIZE}
diff --git 
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
 
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
index 07447cfdbe..c88541cf1b 100644
--- 
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
+++ 
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
@@ -80,6 +80,7 @@ object StorageConfig {
   val s3Region: String = conf.getString("storage.s3.region")
   val s3Username: String = conf.getString("storage.s3.auth.username")
   val s3Password: String = conf.getString("storage.s3.auth.password")
+  val s3LargeBinariesBucket: String = 
conf.getString("storage.s3.large-binaries-bucket")
   val s3MultipartUploadPartSize: Long = parseSizeStringToBytes(
     conf.getString("storage.s3.multipart.part-size")
   )
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
index 44db3929f2..2fa4acb530 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
@@ -20,44 +20,88 @@
 package org.apache.texera.service.util
 
 import com.typesafe.scalalogging.LazyLogging
+import org.apache.texera.amber.config.StorageConfig
 
 import java.util.UUID
 
-/**
-  * Manages the lifecycle of LargeBinaries stored in S3.
-  *
-  * Handles creation and deletion of large objects that exceed
-  * normal tuple size limits.
-  */
+/** Manages the lifecycle of LargeBinaries (objects too large for normal 
tuples) in S3. */
 object LargeBinaryManager extends LazyLogging {
-  val DEFAULT_BUCKET: String = "texera-large-binaries"
+  // From config so the JVM/Python workers and cleanup all share one bucket.
+  val DEFAULT_BUCKET: String = StorageConfig.s3LargeBinariesBucket
+
+  /** Per-execution key prefix; the single source of truth for the write and 
delete paths. */
+  private def executionPrefix(executionId: Long): String = 
s"objects/$executionId"
 
   /**
-    * Creates a new LargeBinary reference.
-    * The actual data upload happens separately via LargeBinaryOutputStream.
+    * Base URI (trailing slash) under which `executionId`'s large binaries 
live; create()
+    * appends a unique suffix. Empty when the bucket is unconfigured, so 
create() fails loudly.
     *
-    * @return S3 URI string for the new LargeBinary (format: s3://bucket/key)
+    * `executionId` must be a persisted EID. The sentinel id 
(DEFAULT_EXECUTION_ID = 1) shares
+    * this space, so binaries must only be created under a real execution — 
else execution 1
+    * and a default-context run would collide under objects/1/.
     */
-  def create(): String = {
-    val objectKey = 
s"objects/${System.currentTimeMillis()}/${UUID.randomUUID()}"
-    val uri = s"s3://$DEFAULT_BUCKET/$objectKey"
+  def baseUriForExecution(executionId: Long): String =
+    if (DEFAULT_BUCKET.isEmpty) ""
+    else s"s3://$DEFAULT_BUCKET/${executionPrefix(executionId)}/"
+
+  /**
+    * Base URI for binaries created on the current thread — thread-local 
because create()
+    * runs on the DP thread. Seeded once at DP-thread start, assuming a thread 
serves one
+    * execution; re-seed if workers are ever reused across executions.
+    */
+  private val currentBaseUri: ThreadLocal[Option[String]] =
+    ThreadLocal.withInitial(() => Option.empty[String])
+
+  /** Sets the current thread's base URI; an empty value clears it, so 
create() fails loudly. */
+  def setCurrentBaseUri(baseUri: String): Unit =
+    currentBaseUri.set(Option(baseUri).filter(_.nonEmpty))
 
-    uri
+  /**
+    * Creates a LargeBinary reference by appending a unique suffix to the 
current thread's
+    * base URI. Data is uploaded separately via LargeBinaryOutputStream.
+    *
+    * @return e.g. s3://bucket/objects/{eid}/{uuid}
+    */
+  def create(): String = {
+    val baseUri = currentBaseUri
+      .get()
+      .getOrElse(
+        throw new IllegalStateException(
+          "LargeBinaryManager.create() requires a base URI, " +
+            "but none was set on the current thread."
+        )
+      )
+    s"$baseUri${UUID.randomUUID()}"
   }
 
   /**
-    * Deletes all large binaries from the bucket.
+    * Deletes all large binaries for one execution. Uses deleteDirectory, 
which removes one
+    * listing page (<= 1000 objects) — enough for expected counts; more needs 
a paginated delete.
     *
-    * @throws java.lang.Exception if the deletion fails
-    * @return Unit
+    * @param executionId the execution whose large binaries should be removed
     */
-  def deleteAllObjects(): Unit = {
+  def deleteByExecution(executionId: Long): Unit =
+    deleteByExecution(executionId, S3StorageClient.deleteDirectory)
+
+  /** Overload taking the delete op as a parameter. Visible for testing. */
+  private[util] def deleteByExecution(
+      executionId: Long,
+      deleteDir: (String, String) => Unit
+  ): Unit = {
     try {
-      S3StorageClient.deleteDirectory(DEFAULT_BUCKET, "objects")
-      logger.info(s"Successfully deleted all large binaries from bucket: 
$DEFAULT_BUCKET")
+      deleteDir(DEFAULT_BUCKET, executionPrefix(executionId))
+      logger.info(
+        s"Deleted large binaries for execution $executionId from bucket: 
$DEFAULT_BUCKET"
+      )
     } catch {
+      // Swallowed: cleanup is a side effect of deletion and must not fail it. 
Logged at
+      // error because a failure here silently leaks storage.
       case e: Exception =>
-        logger.warn(s"Failed to delete large binaries from bucket: 
$DEFAULT_BUCKET", e)
+        logger.error(
+          s"Failed to delete large binaries for execution $executionId " +
+            s"from bucket: $DEFAULT_BUCKET",
+          e
+        )
     }
   }
 
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala
index 77d142efee..a0436e682e 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala
@@ -21,8 +21,22 @@ package org.apache.texera.service.util
 
 import org.apache.texera.amber.core.tuple.LargeBinary
 import org.scalatest.funsuite.AnyFunSuite
+import org.scalatest.BeforeAndAfterEach
 
-class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase {
+class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase with 
BeforeAndAfterEach {
+
+  /** Execution id used by the bulk of the tests. */
+  private val TestExecutionId: Long = 9999L
+
+  /** Seeds the thread's base URI for an execution, as the controller does in 
production. */
+  private def setExecutionContext(eid: Long): Unit =
+    
LargeBinaryManager.setCurrentBaseUri(LargeBinaryManager.baseUriForExecution(eid))
+
+  /** Each test creates large binaries; they need a base URI on the thread. */
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    setExecutionContext(TestExecutionId)
+  }
 
   /** Creates a large binary from string data and returns it. */
   private def createLargeBinary(data: String): LargeBinary = {
@@ -54,7 +68,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
     assert(stream.readAllBytes().sameElements(data.getBytes))
     stream.close()
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryInputStream should read exact number of bytes") {
@@ -67,7 +81,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
     assert(result.sameElements("0123456789".getBytes))
     stream.close()
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryInputStream should handle reading more bytes than 
available") {
@@ -81,7 +95,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
     assert(result.sameElements(data.getBytes))
     stream.close()
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryInputStream should support standard single-byte read") {
@@ -94,7 +108,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
     assert(stream.read() == -1) // EOF
     stream.close()
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryInputStream should return -1 at EOF") {
@@ -105,7 +119,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
     assert(stream.read() == -1)
     stream.close()
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryInputStream should throw exception when reading from closed 
stream") {
@@ -117,7 +131,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
     assertThrows[java.io.IOException](stream.read())
     assertThrows[java.io.IOException](stream.readAllBytes())
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryInputStream should handle multiple close calls") {
@@ -127,7 +141,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
     stream.close()
     stream.close() // Should not throw
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryInputStream should read large data correctly") {
@@ -145,7 +159,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
     assert(result.sameElements(largeData))
     stream.close()
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   // ========================================
@@ -183,7 +197,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
     }
   }
 
-  test("LargeBinaryManager should delete all large binaries") {
+  test("deleteByExecution removes binaries written via new LargeBinary()") {
     val pointer1 = new LargeBinary()
     val out1 = new LargeBinaryOutputStream(pointer1)
     try {
@@ -200,18 +214,18 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
       out2.close()
     }
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryManager should handle delete with no objects gracefully") {
-    LargeBinaryManager.deleteAllObjects() // Should not throw exception
+    LargeBinaryManager.deleteByExecution(TestExecutionId) // Should not throw 
exception
   }
 
-  test("LargeBinaryManager should delete all objects") {
+  test("deleteByExecution removes multiple binaries for the execution") {
     val pointer1 = createLargeBinary("Test data")
     val pointer2 = createLargeBinary("Test data")
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryManager should create bucket if it doesn't exist") {
@@ -219,7 +233,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
 
     assertStandardBucket(pointer)
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryManager should handle large objects correctly") {
@@ -237,7 +251,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
     stream.close()
 
     assert(readData.sameElements(largeData))
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryManager should generate unique URIs for different objects") 
{
@@ -261,7 +275,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
     assert(pointer1.getUri != pointer2.getUri)
     assert(pointer1.getObjectKey != pointer2.getObjectKey)
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryInputStream should handle multiple reads from the same 
large binary") {
@@ -279,7 +293,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
     assert(readData1.sameElements(data.getBytes))
     assert(readData2.sameElements(data.getBytes))
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryManager should properly parse bucket name and object key 
from large binary") {
@@ -289,7 +303,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
     assert(largeBinary.getObjectKey.nonEmpty)
     assert(!largeBinary.getObjectKey.startsWith("/"))
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   // ========================================
@@ -309,7 +323,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
 
     assertStandardBucket(largeBinary)
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryInputStream constructor should read large binary contents") 
{
@@ -322,7 +336,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
 
     assert(readData.sameElements(data.getBytes))
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryOutputStream and LargeBinaryInputStream should work 
together end-to-end") {
@@ -344,7 +358,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
 
     assert(readData.sameElements(data.getBytes))
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   // ========================================
@@ -368,7 +382,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
 
     assert(readData.sameElements(data.getBytes))
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryOutputStream should create large binary") {
@@ -381,7 +395,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
 
     assertStandardBucket(largeBinary)
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryOutputStream should handle large data correctly") {
@@ -399,7 +413,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
 
     assert(readData.sameElements(largeData))
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryOutputStream should handle multiple writes") {
@@ -416,7 +430,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
 
     assert(readData.sameElements("Hello World!".getBytes))
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryOutputStream should throw exception when writing to closed 
stream") {
@@ -427,7 +441,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
 
     assertThrows[java.io.IOException](outStream.write("more".getBytes))
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinaryOutputStream should handle close() being called multiple 
times") {
@@ -437,7 +451,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
     outStream.close()
     outStream.close() // Should not throw
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("New LargeBinary() constructor should create unique URIs") {
@@ -447,7 +461,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
     assert(largeBinary1.getUri != largeBinary2.getUri)
     assert(largeBinary1.getObjectKey != largeBinary2.getObjectKey)
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
   }
 
   test("LargeBinary() and LargeBinaryOutputStream API should be symmetric with 
input") {
@@ -466,6 +480,47 @@ class LargeBinaryManagerSpec extends AnyFunSuite with 
S3StorageTestBase {
 
     assert(readData.sameElements(data.getBytes))
 
-    LargeBinaryManager.deleteAllObjects()
+    LargeBinaryManager.deleteByExecution(TestExecutionId)
+  }
+
+  test("deleteByExecution removes only the target execution's binaries") {
+    // Create one binary under execution 1001 and another under 1002.
+    setExecutionContext(1001L)
+    createLargeBinary("data for 1001")
+    setExecutionContext(1002L)
+    createLargeBinary("data for 1002")
+
+    // Delete only execution 1001's binaries.
+    LargeBinaryManager.deleteByExecution(1001L)
+
+    try {
+      assert(!S3StorageClient.directoryExists("texera-large-binaries", 
"objects/1001"))
+      assert(S3StorageClient.directoryExists("texera-large-binaries", 
"objects/1002"))
+    } finally {
+      // Keep the test self-contained: clean up 1002's objects and reset the 
thread's
+      // base URI rather than relying on the next test's beforeEach.
+      LargeBinaryManager.deleteByExecution(1002L)
+      setExecutionContext(TestExecutionId)
+    }
+  }
+
+  test("deleteByExecution distinguishes executions whose ids share a numeric 
prefix") {
+    // "objects/1" is a string prefix of "objects/11"; a prefix delete that 
dropped the
+    // trailing "/" would wipe both. Pin that deleting execution 1 leaves 11 
untouched.
+    setExecutionContext(1L)
+    createLargeBinary("data for execution 1")
+    setExecutionContext(11L)
+    createLargeBinary("data for execution 11")
+
+    LargeBinaryManager.deleteByExecution(1L)
+
+    try {
+      assert(!S3StorageClient.directoryExists("texera-large-binaries", 
"objects/1"))
+      assert(S3StorageClient.directoryExists("texera-large-binaries", 
"objects/11"))
+    } finally {
+      // Self-contained cleanup, mirroring the isolation test above.
+      LargeBinaryManager.deleteByExecution(11L)
+      setExecutionContext(TestExecutionId)
+    }
   }
 }
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala
new file mode 100644
index 0000000000..d6268d29d3
--- /dev/null
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import org.scalatest.funsuite.AnyFunSuite
+
+/**
+  * Unit tests for [[LargeBinaryManager.deleteByExecution]] with the 
directory-delete op
+  * injected, so success and the swallow-and-log error path run without a live 
S3 endpoint.
+  */
+class LargeBinaryManagerUnitSpec extends AnyFunSuite {
+
+  test("deleteByExecution issues a delete scoped to the execution's object 
prefix") {
+    var captured: Option[(String, String)] = None
+    LargeBinaryManager.deleteByExecution(
+      42L,
+      (bucket, prefix) => captured = Some((bucket, prefix))
+    )
+    assert(captured.contains((LargeBinaryManager.DEFAULT_BUCKET, 
"objects/42")))
+  }
+
+  test("deleteByExecution swallows exceptions raised by the underlying 
delete") {
+    // The error path logs and returns; it must not propagate the failure to 
callers.
+    LargeBinaryManager.deleteByExecution(7L, (_, _) => throw new 
RuntimeException("boom"))
+    succeed
+  }
+
+  test("create returns a URI under the current thread's base URI") {
+    // create() reads a thread-local; run on a dedicated thread so the base 
URI is
+    // isolated and does not leak into other tests.
+    @volatile var uri: String = ""
+    val thread = new Thread(() => {
+      
LargeBinaryManager.setCurrentBaseUri(LargeBinaryManager.baseUriForExecution(555L))
+      uri = LargeBinaryManager.create()
+    })
+    thread.start()
+    thread.join()
+    val prefix = s"s3://${LargeBinaryManager.DEFAULT_BUCKET}/objects/555/"
+    assert(uri.startsWith(prefix))
+    // a unique (UUID) suffix follows the execution-scoped prefix
+    assert(uri.stripPrefix(prefix).nonEmpty)
+  }
+
+  test("create throws when no base URI is set on the thread") {
+    // A fresh thread starts with no base URI, so create() must fail fast.
+    @volatile var caught: Option[Throwable] = None
+    val thread = new Thread(() => {
+      try LargeBinaryManager.create()
+      catch { case e: Throwable => caught = Some(e) }
+    })
+    thread.start()
+    thread.join()
+    assert(caught.exists(_.isInstanceOf[IllegalStateException]))
+  }
+}

Reply via email to