This is an automated email from the ASF dual-hosted git repository.
xuang7 pushed a commit to branch release/v1.2
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/release/v1.2 by this push:
new 01eea9d9d1 fix: scope large binary storage and cleanup by execution id
(#5280) [release/v1.2 backport] (#5602)
01eea9d9d1 is described below
commit 01eea9d9d1b7a36d99719f0fac05c92fb14bb30e
Author: Kunwoo (Chris) <[email protected]>
AuthorDate: Wed Jun 10 11:01:14 2026 -0700
fix: scope large binary storage and cleanup by execution id (#5280)
[release/v1.2 backport] (#5602)
### What changes were proposed in this PR?
Backport of #5280 ("scope large binary storage and cleanup by execution
id") to `release/v1.2`.
As noted in #5569, #5280 cannot be backported as a single cherry-pick:
its changes to `large_binary_output_stream.py` build on top of #5249
("keep failed-upload cleanup inside the upload worker"), which is not
present on `release/v1.2`. Cherry-picking #5280 alone leaves a dangling
reference to the `large_binary_manager` module in
`_cleanup_failed_upload()` (a method #5249 removes), producing a
`NameError` at runtime.
This PR therefore backports the dependency chain, both as clean `git
cherry-pick -x` of the original squash commits:
1. **#5249** — keep failed-upload cleanup inside the upload worker
(prerequisite)
2. **#5280** — scope large binary storage and cleanup by execution id
`#4707` (which #5249 follows up on) is already present on
`release/v1.2`, so the chain stops there. After applying both commits,
every file touched by #5280 is byte-identical to its state on `main` at
the merged #5280 commit (`48e800e4`), and `release/v1.2` had no
independent changes to any of those files.
### Any related issues, documentation, discussions?
Backports #5280 (which closes #4123 on `main`). Prerequisite: #5249.
Unblocks the #5569 backport, which can then cherry-pick cleanly onto
`release/v1.2`.
### How was this PR tested?
This is a backport with no changes beyond the two cherry-picked commits,
so it relies on the existing tests carried over from #5249 and #5280
(`LargeBinaryManagerSpec`, `LargeBinaryManagerUnitSpec`,
`test_large_binary_output_stream.py`, `test_large_binary_manager.py`,
etc.).
Backport fidelity was verified locally by confirming that, after the
chain is applied onto `release/v1.2`:
- all files touched by #5280 are byte-identical to `main@48e800e4`;
- no callers of the removed `deleteAllObjects()` remain;
- `deleteByExecution` is wired in `WorkflowResource` and
`WorkflowService`.
Full compile and unit-test runs are left to CI.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.8)
---------
Signed-off-by: Matthew B. <[email protected]>
Signed-off-by: Kunwoo (Chris) <[email protected]>
Co-authored-by: Matthew B. <[email protected]>
Co-authored-by: Xiaozhen Liu <[email protected]>
---
.../main/python/core/models/type/large_binary.py | 4 +-
.../src/main/python/core/storage/storage_config.py | 5 +
.../pytexera/storage/large_binary_input_stream.py | 4 +-
.../pytexera/storage/large_binary_manager.py | 110 ++++++++++-------
.../pytexera/storage/large_binary_output_stream.py | 28 ++---
amber/src/main/python/texera_run_python_worker.py | 2 +
.../pythonworker/PythonWorkflowWorker.scala | 3 +-
.../scheduling/config/WorkerConfig.scala | 9 +-
.../engine/architecture/worker/DPThread.scala | 10 +-
.../architecture/worker/WorkflowWorker.scala | 8 +-
.../dashboard/user/workflow/WorkflowResource.scala | 6 +-
.../texera/web/service/WorkflowService.scala | 6 +-
.../packaging/test_state_materialization_e2e.py | 1 +
.../python/core/models/type/test_large_binary.py | 6 +-
.../core/storage/iceberg/test_iceberg_document.py | 1 +
.../storage/test_large_binary_input_stream.py | 5 +-
.../pytexera/storage/test_large_binary_manager.py | 133 ++++++++++++---------
.../storage/test_large_binary_output_stream.py | 9 +-
.../scheduling/config/SchedulingConfigsSpec.scala | 19 ++-
.../engine/architecture/worker/DPThreadSpec.scala | 32 ++++-
common/config/src/main/resources/storage.conf | 4 +
.../apache/texera/amber/config/StorageConfig.scala | 1 +
.../texera/service/util/LargeBinaryManager.scala | 86 +++++++++----
.../service/util/LargeBinaryManagerSpec.scala | 115 +++++++++++++-----
.../service/util/LargeBinaryManagerUnitSpec.scala | 72 +++++++++++
25 files changed, 494 insertions(+), 185 deletions(-)
diff --git a/amber/src/main/python/core/models/type/large_binary.py
b/amber/src/main/python/core/models/type/large_binary.py
index 581a688912..34110f374e 100644
--- a/amber/src/main/python/core/models/type/large_binary.py
+++ b/amber/src/main/python/core/models/type/large_binary.py
@@ -63,9 +63,9 @@ class largebinary:
"""
if uri is None:
# Lazy import to avoid circular dependencies
- from pytexera.storage import large_binary_manager
+ from pytexera.storage.large_binary_manager import
LargeBinaryManager
- uri = large_binary_manager.create()
+ uri = LargeBinaryManager().create()
if not uri.startswith("s3://"):
raise ValueError(f"largebinary URI must start with 's3://', got:
{uri}")
diff --git a/amber/src/main/python/core/storage/storage_config.py
b/amber/src/main/python/core/storage/storage_config.py
index 8233590987..05e8c3ee16 100644
--- a/amber/src/main/python/core/storage/storage_config.py
+++ b/amber/src/main/python/core/storage/storage_config.py
@@ -41,6 +41,9 @@ class StorageConfig:
S3_REGION = None
S3_AUTH_USERNAME = None
S3_AUTH_PASSWORD = None
+ # Execution-scoped base URI (s3://bucket/objects/{eid}/) for this worker's
large
+ # binaries; fixed at process init, which assumes one process per execution.
+ S3_LARGE_BINARIES_BASE_URI = None
@classmethod
def initialize(
@@ -59,6 +62,7 @@ class StorageConfig:
s3_region,
s3_auth_username,
s3_auth_password,
+ s3_large_binaries_base_uri,
):
if cls._initialized:
raise RuntimeError(
@@ -82,6 +86,7 @@ class StorageConfig:
cls.S3_REGION = s3_region
cls.S3_AUTH_USERNAME = s3_auth_username
cls.S3_AUTH_PASSWORD = s3_auth_password
+ cls.S3_LARGE_BINARIES_BASE_URI = s3_large_binaries_base_uri
cls._initialized = True
diff --git
a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
index 68368c5c12..f17373c449 100644
--- a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
+++ b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
@@ -60,9 +60,9 @@ class LargeBinaryInputStream(IOBase):
def _lazy_init(self):
"""Download from S3 on first read operation."""
- from pytexera.storage import large_binary_manager
+ from pytexera.storage.large_binary_manager import LargeBinaryManager
- s3 = large_binary_manager._get_s3_client()
+ s3 = LargeBinaryManager()._get_s3_client()
response = s3.get_object(
Bucket=self._large_binary.get_bucket_name(),
Key=self._large_binary.get_object_key(),
diff --git a/amber/src/main/python/pytexera/storage/large_binary_manager.py
b/amber/src/main/python/pytexera/storage/large_binary_manager.py
index e061eac622..b323c676d5 100644
--- a/amber/src/main/python/pytexera/storage/large_binary_manager.py
+++ b/amber/src/main/python/pytexera/storage/large_binary_manager.py
@@ -22,57 +22,83 @@ Users should not interact with this module directly. Use
largebinary() construct
and LargeBinaryInputStream/LargeBinaryOutputStream instead.
"""
-import time
+import threading
import uuid
from loguru import logger
from core.storage.storage_config import StorageConfig
-# Module-level state
-_s3_client = None
-DEFAULT_BUCKET = "texera-large-binaries"
+class LargeBinaryManager:
+ """Manages large binaries in S3 for a worker process.
-def _get_s3_client():
- """Get or initialize S3 client (lazy initialization, cached)."""
- global _s3_client
- if _s3_client is None:
- try:
- import boto3
- from botocore.config import Config
- except ImportError as e:
- raise RuntimeError("boto3 required. Install with: pip install
boto3") from e
+ A singleton, so the cached S3 client is shared process-wide. create()
appends a
+ unique suffix to an execution-scoped base URI handed down by the
controller as
+ process config (``StorageConfig.S3_LARGE_BINARIES_BASE_URI``); the worker
never
+ holds an execution id. This is the Python counterpart of the JVM
+ ``LargeBinaryManager``, which uses a thread-local instead because one JVM
process
+ runs many workers across executions (a Python worker is one process per
execution).
+ """
- _s3_client = boto3.client(
- "s3",
- endpoint_url=StorageConfig.S3_ENDPOINT,
- aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
- aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
- region_name=StorageConfig.S3_REGION,
- config=Config(signature_version="s3v4", s3={"addressing_style":
"path"}),
- )
- return _s3_client
+ _instance = None
+ # Guards singleton creation and S3-client init; reached from the operator
and upload
+ # threads.
+ _lock = threading.Lock()
+ def __new__(cls):
+ # Double-checked locking: skip the lock once the instance exists.
+ if cls._instance is None:
+ with cls._lock:
+ if cls._instance is None:
+ instance = super().__new__(cls)
+ instance._s3_client = None
+ cls._instance = instance
+ return cls._instance
-def _ensure_bucket_exists(bucket: str):
- """Ensure S3 bucket exists, creating it if necessary."""
- s3 = _get_s3_client()
- try:
- s3.head_bucket(Bucket=bucket)
- except s3.exceptions.NoSuchBucket:
- logger.debug(f"Bucket {bucket} not found, creating it")
- s3.create_bucket(Bucket=bucket)
- logger.info(f"Created bucket: {bucket}")
+ def _get_s3_client(self):
+ """Get or initialize the S3 client (lazy initialization, cached)."""
+ if self._s3_client is None:
+ with self._lock:
+ if self._s3_client is None:
+ try:
+ import boto3
+ from botocore.config import Config
+ except ImportError as e:
+ raise RuntimeError(
+ "boto3 required. Install with: pip install boto3"
+ ) from e
+ self._s3_client = boto3.client(
+ "s3",
+ endpoint_url=StorageConfig.S3_ENDPOINT,
+ aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
+ aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
+ region_name=StorageConfig.S3_REGION,
+ config=Config(
+ signature_version="s3v4", s3={"addressing_style":
"path"}
+ ),
+ )
+ return self._s3_client
-def create() -> str:
- """
- Creates a new largebinary reference with a unique S3 URI.
+ def _ensure_bucket_exists(self, bucket: str):
+ """Ensure the S3 bucket exists, creating it if necessary."""
+ s3 = self._get_s3_client()
+ try:
+ s3.head_bucket(Bucket=bucket)
+ except s3.exceptions.NoSuchBucket:
+ logger.debug(f"Bucket {bucket} not found, creating it")
+ s3.create_bucket(Bucket=bucket)
+ logger.info(f"Created bucket: {bucket}")
- Returns:
- S3 URI string (format: s3://bucket/key)
- """
- _ensure_bucket_exists(DEFAULT_BUCKET)
- timestamp_ms = int(time.time() * 1000)
- unique_id = uuid.uuid4()
- object_key = f"objects/{timestamp_ms}/{unique_id}"
- return f"s3://{DEFAULT_BUCKET}/{object_key}"
+ def create(self) -> str:
+ """Append a unique suffix to the controller-provided base URI.
+
+ Pure string construction (no S3 round-trip); the bucket is created on
demand at
+ upload time. Returns e.g.
``s3://bucket/objects/{execution_id}/{uuid}``.
+ """
+ base_uri = StorageConfig.S3_LARGE_BINARIES_BASE_URI
+ if not base_uri:
+ raise RuntimeError(
+ "largebinary() requires a large-binaries base URI, but none is
"
+ "configured (StorageConfig.S3_LARGE_BINARIES_BASE_URI is
unset)."
+ )
+ return f"{base_uri}{uuid.uuid4()}"
diff --git
a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
index 0cdf8a3679..63d4caeb6b 100644
--- a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
+++ b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
@@ -29,7 +29,7 @@ Usage:
from typing import Optional, Union
from io import IOBase
from core.models.type.large_binary import largebinary
-from pytexera.storage import large_binary_manager
+from pytexera.storage.large_binary_manager import LargeBinaryManager
import threading
import queue
@@ -153,14 +153,25 @@ class LargeBinaryOutputStream(IOBase):
if self._upload_thread is None:
def upload_worker():
+ s3 = None
try:
-
large_binary_manager._ensure_bucket_exists(self._bucket_name)
- s3 = large_binary_manager._get_s3_client()
+ manager = LargeBinaryManager()
+ manager._ensure_bucket_exists(self._bucket_name)
+ s3 = manager._get_s3_client()
reader = _QueueReader(self._queue)
s3.upload_fileobj(reader, self._bucket_name,
self._object_key)
except Exception as e:
+ # Record the failure first so the next write() call can
+ # immediately raise, then best-effort clean up the object.
with self._lock:
self._upload_exception = e
+ if s3 is not None:
+ try:
+ s3.delete_object(
+ Bucket=self._bucket_name, Key=self._object_key
+ )
+ except Exception:
+ pass
finally:
self._upload_complete.set()
@@ -214,12 +225,10 @@ class LargeBinaryOutputStream(IOBase):
self._upload_thread.join()
self._upload_complete.wait()
- # Check for errors and cleanup if needed
with self._lock:
exception = self._upload_exception
if exception is not None:
- self._cleanup_failed_upload()
raise IOError(
f"Failed to complete upload: {exception}"
) from exception
@@ -228,15 +237,6 @@ class LargeBinaryOutputStream(IOBase):
# the second close() call on Python 3.13+.
super().close()
- def _cleanup_failed_upload(self):
- """Clean up a failed upload by deleting the S3 object."""
- try:
- s3 = large_binary_manager._get_s3_client()
- s3.delete_object(Bucket=self._bucket_name, Key=self._object_key)
- except Exception:
- # Ignore cleanup errors - we're already handling an upload failure
- pass
-
def __enter__(self):
"""Context manager entry."""
return self
diff --git a/amber/src/main/python/texera_run_python_worker.py
b/amber/src/main/python/texera_run_python_worker.py
index f1ba8c7db6..5c3e25e096 100644
--- a/amber/src/main/python/texera_run_python_worker.py
+++ b/amber/src/main/python/texera_run_python_worker.py
@@ -70,6 +70,7 @@ if __name__ == "__main__":
s3_region,
s3_auth_username,
s3_auth_password,
+ s3_large_binaries_base_uri,
) = sys.argv
init_loguru_logger(logger_level)
StorageConfig.initialize(
@@ -87,6 +88,7 @@ if __name__ == "__main__":
s3_region,
s3_auth_username,
s3_auth_password,
+ s3_large_binaries_base_uri,
)
# Setting R_HOME environment variable for R-UDF usage
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
index b8bb3a92b6..c7ed3b6324 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
@@ -209,7 +209,8 @@ class PythonWorkflowWorker(
StorageConfig.s3Endpoint,
StorageConfig.s3Region,
StorageConfig.s3Username,
- StorageConfig.s3Password
+ StorageConfig.s3Password,
+ workerConfig.largeBinaryBaseUri
)
).run(BasicIO.standard(false))
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala
index bb8f3d5775..8166dee6dd 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala
@@ -23,6 +23,7 @@ import org.apache.texera.amber.config.ApplicationConfig
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
import org.apache.texera.amber.core.workflow.PhysicalOp
import org.apache.texera.amber.util.VirtualIdentityUtils
+import org.apache.texera.service.util.LargeBinaryManager
case object WorkerConfig {
def generateWorkerConfigs(
@@ -45,7 +46,8 @@ case object WorkerConfig {
WorkerConfig(
VirtualIdentityUtils.createWorkerIdentity(physicalOp.workflowId,
physicalOp.id, idx),
pveName = physicalOp.pveName,
- cuid = cuid
+ cuid = cuid,
+ largeBinaryBaseUri =
LargeBinaryManager.baseUriForExecution(physicalOp.executionId.id)
)
)
}
@@ -54,5 +56,8 @@ case object WorkerConfig {
case class WorkerConfig(
workerId: ActorVirtualIdentity,
pveName: String = "",
- cuid: Option[Int] = None
+ cuid: Option[Int] = None,
+ // Controller-named, execution-scoped base URI under which this worker's
large binaries
+ // live; create() appends a unique suffix. Empty when large binaries are
unconfigured.
+ largeBinaryBaseUri: String = ""
)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala
index 0c136d613a..fd4d7786c8 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala
@@ -39,6 +39,7 @@ import org.apache.texera.amber.engine.common.ambermessage.{
}
import org.apache.texera.amber.engine.common.virtualidentity.util.SELF
import org.apache.texera.amber.error.ErrorUtils.safely
+import org.apache.texera.service.util.LargeBinaryManager
import java.util.concurrent._
@@ -46,7 +47,10 @@ class DPThread(
val actorId: ActorVirtualIdentity,
dp: DataProcessor,
logManager: ReplayLogManager,
- internalQueue: LinkedBlockingQueue[DPInputQueueElement]
+ internalQueue: LinkedBlockingQueue[DPInputQueueElement],
+ // Controller-named, execution-scoped base URI for large binaries created
on this
+ // thread (empty when unconfigured). Seeded into LargeBinaryManager at
thread start.
+ largeBinaryBaseUri: String = ""
) extends AmberLogging {
// initialize dp thread upon construction
@@ -91,6 +95,10 @@ class DPThread(
dpThread = dpThreadExecutor.submit(new Runnable() {
def run(): Unit = {
Thread.currentThread().setName(getThreadName)
+ // Seed this thread's large-binary base URI (from WorkerConfig)
before any tuple,
+ // so create() can append a suffix without the execution id. Once
per thread,
+ // assuming a thread serves one execution.
+ LargeBinaryManager.setCurrentBaseUri(largeBinaryBaseUri)
logger.info("DP thread started")
startFuture.complete(())
dp.statisticsManager.initializeWorkerStartTime(System.nanoTime())
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala
index d1a0a300d9..13b1d94b04 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala
@@ -107,7 +107,13 @@ class WorkflowWorker(
)
}
// dp is ready
- dpThread = new DPThread(workerConfig.workerId, dp, logManager, inputQueue)
+ dpThread = new DPThread(
+ workerConfig.workerId,
+ dp,
+ logManager,
+ inputQueue,
+ workerConfig.largeBinaryBaseUri
+ )
dpThread.start()
}
diff --git
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
index cb910d11c3..ee559048c9 100644
---
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
+++
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
@@ -611,8 +611,6 @@ class WorkflowResource extends LazyLogging {
.asScala
.toList
- LargeBinaryManager.deleteAllObjects()
-
// Collect all URIs related to executions for cleanup
val uris = eids.flatMap { eid =>
val executionId = ExecutionIdentity(eid.longValue())
@@ -638,6 +636,10 @@ class WorkflowResource extends LazyLogging {
}
}
+ // Delete large binaries for each execution belonging to the workflows
being
+ // removed. Done after the transaction (like the document cleanup below).
+ eids.foreach(eid =>
LargeBinaryManager.deleteByExecution(eid.longValue()))
+
// Clean up document storage
try {
uris.foreach { uri =>
diff --git
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
index 0044c95967..c18b8b50e8 100644
--- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
+++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
@@ -318,7 +318,7 @@ class WorkflowService(
* 2. Clears URI references from the execution registry
* 3. Safely clears all result and console message documents
* 4. Expires Iceberg snapshots for runtime statistics
- * 5. Deletes large binaries from MinIO
+ * 5. Deletes this execution's large binaries from MinIO
*
* @param eid The execution identity to clean up resources for
*/
@@ -355,7 +355,7 @@ class WorkflowService(
logger.debug(s"Error processing document at $uri:
${error.getMessage}")
}
}
- // Delete large binaries
- LargeBinaryManager.deleteAllObjects()
+ // Delete this execution's large binaries
+ LargeBinaryManager.deleteByExecution(eid.id)
}
}
diff --git
a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
index 9d1fd30698..cfc4f7f676 100644
---
a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
+++
b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py
@@ -108,6 +108,7 @@ def sqlite_iceberg_catalog():
s3_region="unused",
s3_auth_username="unused",
s3_auth_password="unused",
+ s3_large_binaries_base_uri="s3://texera-large-binaries/objects/0/",
)
original_instance = IcebergCatalogInstance._instance
diff --git a/amber/src/test/python/core/models/type/test_large_binary.py
b/amber/src/test/python/core/models/type/test_large_binary.py
index 36310e1dd5..56348f4265 100644
--- a/amber/src/test/python/core/models/type/test_large_binary.py
+++ b/amber/src/test/python/core/models/type/test_large_binary.py
@@ -18,6 +18,10 @@
import pytest
from unittest.mock import patch
from core.models.type.large_binary import largebinary
+from pytexera.storage.large_binary_manager import LargeBinaryManager
+
+# The manager is a singleton; bind the shared instance for the tests.
+large_binary_manager = LargeBinaryManager()
class TestLargeBinary:
@@ -31,7 +35,7 @@ class TestLargeBinary:
def test_create_without_uri(self):
"""Test creating largebinary without URI (calls
large_binary_manager.create)."""
- with patch("pytexera.storage.large_binary_manager.create") as
mock_create:
+ with patch.object(large_binary_manager, "create") as mock_create:
mock_create.return_value = "s3://bucket/objects/123/uuid"
large_binary = largebinary()
assert large_binary.uri == "s3://bucket/objects/123/uuid"
diff --git
a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
index bdba00ba0c..5e61b4ed68 100644
--- a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
@@ -57,6 +57,7 @@ StorageConfig.initialize(
s3_region="us-east-1",
s3_auth_username="minioadmin",
s3_auth_password="minioadmin",
+ s3_large_binaries_base_uri="s3://texera-large-binaries/objects/0/",
)
diff --git
a/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py
b/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py
index 35bc5bc634..7e6a401aef 100644
--- a/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py
+++ b/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py
@@ -20,7 +20,10 @@ from unittest.mock import patch, MagicMock
from io import BytesIO
from core.models.type.large_binary import largebinary
from pytexera.storage.large_binary_input_stream import LargeBinaryInputStream
-from pytexera.storage import large_binary_manager
+from pytexera.storage.large_binary_manager import LargeBinaryManager
+
+# The manager is a singleton; bind the shared instance for the tests.
+large_binary_manager = LargeBinaryManager()
class TestLargeBinaryInputStream:
diff --git
a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
index 1942e91f8b..e1da5eee18 100644
--- a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
+++ b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py
@@ -15,34 +15,44 @@
# specific language governing permissions and limitations
# under the License.
+import re
+
import pytest
from unittest.mock import patch, MagicMock
-from pytexera.storage import large_binary_manager
+from pytexera.storage.large_binary_manager import LargeBinaryManager
from core.storage.storage_config import StorageConfig
+# The manager is a singleton; bind the shared instance for the tests.
+large_binary_manager = LargeBinaryManager()
+
+# Execution-scoped base URI the controller hands down; create() appends a
unique suffix.
+TEST_BASE_URI = "s3://texera-large-binaries/objects/1/"
+
+
[email protected](autouse=True)
+def _init_storage_config():
+ """Initialize StorageConfig (incl. the large-binaries base URI) for every
test."""
+ if not StorageConfig._initialized:
+ StorageConfig.initialize(
+ catalog_type="postgres",
+ postgres_uri_without_scheme="localhost:5432/test",
+ postgres_username="test",
+ postgres_password="test",
+ rest_catalog_uri="http://localhost:8181/catalog/",
+ rest_catalog_warehouse_name="texera",
+ table_result_namespace="test",
+ table_state_namespace="test-state",
+ directory_path="/tmp/test",
+ commit_batch_size=1000,
+ s3_endpoint="http://localhost:9000",
+ s3_region="us-east-1",
+ s3_auth_username="minioadmin",
+ s3_auth_password="minioadmin",
+ s3_large_binaries_base_uri=TEST_BASE_URI,
+ )
-class TestLargeBinaryManager:
- @pytest.fixture(autouse=True)
- def setup_storage_config(self):
- """Initialize StorageConfig for tests."""
- if not StorageConfig._initialized:
- StorageConfig.initialize(
- catalog_type="postgres",
- postgres_uri_without_scheme="localhost:5432/test",
- postgres_username="test",
- postgres_password="test",
- rest_catalog_uri="http://localhost:8181/catalog/",
- rest_catalog_warehouse_name="texera",
- table_result_namespace="test",
- table_state_namespace="test-state",
- directory_path="/tmp/test",
- commit_batch_size=1000,
- s3_endpoint="http://localhost:9000",
- s3_region="us-east-1",
- s3_auth_username="minioadmin",
- s3_auth_password="minioadmin",
- )
+class TestLargeBinaryManager:
def test_get_s3_client_initializes_once(self):
"""Test that S3 client is initialized and cached."""
# Reset the client
@@ -118,37 +128,48 @@ class TestLargeBinaryManager:
mock_client.head_bucket.assert_called_once_with(Bucket="test-bucket")
mock_client.create_bucket.assert_called_once_with(Bucket="test-bucket")
- def test_create_generates_unique_uri(self):
- """Test that create() generates a unique S3 URI."""
- large_binary_manager._s3_client = None
-
- with patch("boto3.client") as mock_boto3_client:
- mock_client = MagicMock()
- mock_boto3_client.return_value = mock_client
- mock_client.head_bucket.return_value = None
- mock_client.exceptions.NoSuchBucket = type("NoSuchBucket",
(Exception,), {})
-
- uri = large_binary_manager.create()
-
- # Check URI format
- assert uri.startswith("s3://")
- assert
uri.startswith(f"s3://{large_binary_manager.DEFAULT_BUCKET}/")
- assert "objects/" in uri
-
- # Verify bucket was checked/created
- mock_client.head_bucket.assert_called_once_with(
- Bucket=large_binary_manager.DEFAULT_BUCKET
- )
-
- def test_create_uses_default_bucket(self):
- """Test that create() uses the default bucket."""
- large_binary_manager._s3_client = None
-
- with patch("boto3.client") as mock_boto3_client:
- mock_client = MagicMock()
- mock_boto3_client.return_value = mock_client
- mock_client.head_bucket.return_value = None
- mock_client.exceptions.NoSuchBucket = type("NoSuchBucket",
(Exception,), {})
-
- uri = large_binary_manager.create()
- assert large_binary_manager.DEFAULT_BUCKET in uri
+ def test_create_appends_unique_suffix_to_base_uri(self):
+ """create() returns the configured base URI plus a unique suffix (no
S3 call)."""
+ base = StorageConfig.S3_LARGE_BINARIES_BASE_URI
+
+ uri1 = large_binary_manager.create()
+ uri2 = large_binary_manager.create()
+
+ assert uri1.startswith(base)
+ assert uri2.startswith(base)
+ # A non-empty, unique suffix follows the base URI.
+ assert uri1 != base
+ assert uri1 != uri2
+
+
+def test_create_matches_execution_scoped_key_shape(monkeypatch):
+ # The base URI is execution-scoped (controller-named); create() only
appends a uuid.
+ monkeypatch.setattr(
+ StorageConfig,
+ "S3_LARGE_BINARIES_BASE_URI",
+ "s3://texera-large-binaries/objects/42/",
+ )
+ uri = large_binary_manager.create()
+ assert
re.fullmatch(r"s3://texera-large-binaries/objects/42/[0-9a-fA-F-]+", uri)
+
+
+def test_create_without_base_uri_raises(monkeypatch):
+ # An unconfigured base URI should fail with a clear error, not a cryptic
S3 one.
+ monkeypatch.setattr(StorageConfig, "S3_LARGE_BINARIES_BASE_URI", None)
+ with pytest.raises(RuntimeError):
+ large_binary_manager.create()
+
+
+def test_largebinarymanager_is_a_singleton():
+ # Constructing the manager always returns the same shared instance.
+ assert LargeBinaryManager() is LargeBinaryManager()
+
+ # State (the cached S3 client) is shared across handles (same instance).
+ mgr = LargeBinaryManager()
+ original = mgr._s3_client
+ sentinel = object()
+ mgr._s3_client = sentinel
+ try:
+ assert LargeBinaryManager()._s3_client is sentinel
+ finally:
+ mgr._s3_client = original
diff --git
a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
index 17725d9c66..dc8881ab16 100644
--- a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
+++ b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py
@@ -24,7 +24,10 @@ from pytexera.storage.large_binary_output_stream import (
LargeBinaryOutputStream,
_QueueReader,
)
-from pytexera.storage import large_binary_manager
+from pytexera.storage.large_binary_manager import LargeBinaryManager
+
+# The manager is a singleton; bind the shared instance for the tests.
+large_binary_manager = LargeBinaryManager()
class TestLargeBinaryOutputStream:
@@ -248,8 +251,8 @@ class TestLargeBinaryOutputStream:
stream.close()
-class TestCleanupFailedUpload:
- """Direct unit tests for _cleanup_failed_upload's silent-swallow path."""
+class TestUploadWorkerCleanup:
+ """Direct unit tests for the upload worker's silent-swallow cleanup."""
@pytest.fixture
def large_binary(self):
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala
index 25ef739251..31ab5c8e1a 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala
@@ -31,6 +31,7 @@ import org.apache.texera.amber.core.virtualidentity.{
}
import org.apache.texera.amber.core.workflow._
import
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._
+import org.apache.texera.service.util.LargeBinaryManager
import org.scalatest.flatspec.AnyFlatSpec
import java.net.URI
@@ -273,11 +274,15 @@ class SchedulingConfigsSpec extends AnyFlatSpec {
// WorkerConfig.generateWorkerConfigs
//
---------------------------------------------------------------------------
- private def physicalOp(parallelizable: Boolean, suggested: Option[Int]):
PhysicalOp =
+ private def physicalOp(
+ parallelizable: Boolean,
+ suggested: Option[Int],
+ executionId: ExecutionIdentity = ExecutionIdentity(0)
+ ): PhysicalOp =
PhysicalOp(
PhysicalOpIdentity(OperatorIdentity("op"), "main"),
WorkflowIdentity(0),
- ExecutionIdentity(0),
+ executionId,
OpExecInitInfo.Empty,
parallelizable = parallelizable,
suggestedWorkerNum = suggested
@@ -308,4 +313,14 @@ class SchedulingConfigsSpec extends AnyFlatSpec {
WorkerConfig.generateWorkerConfigs(physicalOp(parallelizable = true,
suggested = None))
assert(configs.size == ApplicationConfig.numWorkerPerOperatorByDefault)
}
+
+ it should "set largeBinaryBaseUri to the execution-scoped base URI for every
worker" in {
+ val eid = ExecutionIdentity(42L)
+ val configs = WorkerConfig.generateWorkerConfigs(
+ physicalOp(parallelizable = true, suggested = Some(3), executionId = eid)
+ )
+ val expected = LargeBinaryManager.baseUriForExecution(eid.id)
+ assert(expected.contains(s"objects/${eid.id}/"))
+ assert(configs.forall(_.largeBinaryBaseUri == expected))
+ }
}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala
index d8b5d57d63..eb1241182e 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala
@@ -42,11 +42,12 @@ import
org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFI
import
org.apache.texera.amber.engine.common.rpc.AsyncRPCClient.ControlInvocation
import org.apache.texera.amber.engine.common.storage.SequentialRecordStorage
import org.apache.texera.amber.engine.common.virtualidentity.util.SELF
+import org.apache.texera.service.util.LargeBinaryManager
import org.scalamock.scalatest.MockFactory
import org.scalatest.flatspec.AnyFlatSpec
import java.net.URI
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.{CompletableFuture, LinkedBlockingQueue, TimeUnit}
class DPThreadSpec extends AnyFlatSpec with MockFactory {
@@ -240,4 +241,33 @@ class DPThreadSpec extends AnyFlatSpec with MockFactory {
assert(logs.length > 1)
}
+ "DP Thread" should "seed the base URI so create() yields execution-scoped
keys" in {
+ val eid = 7777L
+ val baseUri = LargeBinaryManager.baseUriForExecution(eid)
+ // create() runs on the DP thread; capture what it produces there.
+ val capturedUri = new CompletableFuture[String]()
+ val inputQueue = new LinkedBlockingQueue[DPInputQueueElement]()
+ val dp = new DataProcessor(workerId, x => {}, inputMessageQueue =
inputQueue)
+ dp.executor = new OperatorExecutor {
+ override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike]
= {
+ capturedUri.complete(LargeBinaryManager.create())
+ Iterator.empty
+ }
+ }
+ dp.inputManager.addPort(mockInputPortId, schema, List.empty, List.empty)
+ dp.inputGateway.getChannel(dataChannelId).setPortId(mockInputPortId)
+ dp.adaptiveBatchingMonitor = mock[WorkerTimerService]
+ (dp.adaptiveBatchingMonitor.resumeAdaptiveBatching
_).expects().anyNumberOfTimes()
+ val dpThread = new DPThread(workerId, dp, logManager, inputQueue, baseUri)
+ dpThread.start()
+ inputQueue.put(
+ FIFOMessageElement(WorkflowFIFOMessage(dataChannelId, 0,
DataFrame(Array(tuples(0)))))
+ )
+
+ val uri = capturedUri.get(5, TimeUnit.SECONDS)
+
assert(uri.startsWith(s"s3://${LargeBinaryManager.DEFAULT_BUCKET}/objects/$eid/"))
+ // a unique suffix is appended to the execution-scoped base URI
+ assert(uri.length > baseUri.length)
+ }
+
}
diff --git a/common/config/src/main/resources/storage.conf
b/common/config/src/main/resources/storage.conf
index 0682109f19..12a9919e04 100644
--- a/common/config/src/main/resources/storage.conf
+++ b/common/config/src/main/resources/storage.conf
@@ -115,6 +115,10 @@ storage {
region = "us-west-2"
region = ${?STORAGE_S3_REGION}
+ # Bucket for large binaries; shared by the JVM and Python workers and
cleanup.
+ large-binaries-bucket = "texera-large-binaries"
+ large-binaries-bucket = ${?STORAGE_S3_LARGE_BINARIES_BUCKET}
+
multipart {
part-size = "16MB"
part-size = ${?STORAGE_S3_MULTIPART_PART_SIZE}
diff --git
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
index 07447cfdbe..c88541cf1b 100644
---
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
+++
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
@@ -80,6 +80,7 @@ object StorageConfig {
val s3Region: String = conf.getString("storage.s3.region")
val s3Username: String = conf.getString("storage.s3.auth.username")
val s3Password: String = conf.getString("storage.s3.auth.password")
+ val s3LargeBinariesBucket: String =
conf.getString("storage.s3.large-binaries-bucket")
val s3MultipartUploadPartSize: Long = parseSizeStringToBytes(
conf.getString("storage.s3.multipart.part-size")
)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
index 44db3929f2..2fa4acb530 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala
@@ -20,44 +20,88 @@
package org.apache.texera.service.util
import com.typesafe.scalalogging.LazyLogging
+import org.apache.texera.amber.config.StorageConfig
import java.util.UUID
-/**
- * Manages the lifecycle of LargeBinaries stored in S3.
- *
- * Handles creation and deletion of large objects that exceed
- * normal tuple size limits.
- */
+/** Manages the lifecycle of LargeBinaries (objects too large for normal
tuples) in S3. */
object LargeBinaryManager extends LazyLogging {
- val DEFAULT_BUCKET: String = "texera-large-binaries"
+ // From config so the JVM/Python workers and cleanup all share one bucket.
+ val DEFAULT_BUCKET: String = StorageConfig.s3LargeBinariesBucket
+
+ /** Per-execution key prefix; the single source of truth for the write and
delete paths. */
+ private def executionPrefix(executionId: Long): String =
s"objects/$executionId"
/**
- * Creates a new LargeBinary reference.
- * The actual data upload happens separately via LargeBinaryOutputStream.
+ * Base URI (trailing slash) under which `executionId`'s large binaries
live; create()
+ * appends a unique suffix. Empty when the bucket is unconfigured, so
create() fails loudly.
*
- * @return S3 URI string for the new LargeBinary (format: s3://bucket/key)
+ * `executionId` must be a persisted EID. The sentinel id
(DEFAULT_EXECUTION_ID = 1) shares
+ * this space, so binaries must only be created under a real execution —
else execution 1
+ * and a default-context run would collide under objects/1/.
*/
- def create(): String = {
- val objectKey =
s"objects/${System.currentTimeMillis()}/${UUID.randomUUID()}"
- val uri = s"s3://$DEFAULT_BUCKET/$objectKey"
+ def baseUriForExecution(executionId: Long): String =
+ if (DEFAULT_BUCKET.isEmpty) ""
+ else s"s3://$DEFAULT_BUCKET/${executionPrefix(executionId)}/"
+
+ /**
+ * Base URI for binaries created on the current thread — thread-local
because create()
+ * runs on the DP thread. Seeded once at DP-thread start, assuming a thread
serves one
+ * execution; re-seed if workers are ever reused across executions.
+ */
+ private val currentBaseUri: ThreadLocal[Option[String]] =
+ ThreadLocal.withInitial(() => Option.empty[String])
+
+ /** Sets the current thread's base URI; an empty value clears it, so
create() fails loudly. */
+ def setCurrentBaseUri(baseUri: String): Unit =
+ currentBaseUri.set(Option(baseUri).filter(_.nonEmpty))
- uri
+ /**
+ * Creates a LargeBinary reference by appending a unique suffix to the
current thread's
+ * base URI. Data is uploaded separately via LargeBinaryOutputStream.
+ *
+ * @return e.g. s3://bucket/objects/{eid}/{uuid}
+ */
+ def create(): String = {
+ val baseUri = currentBaseUri
+ .get()
+ .getOrElse(
+ throw new IllegalStateException(
+ "LargeBinaryManager.create() requires a base URI, " +
+ "but none was set on the current thread."
+ )
+ )
+ s"$baseUri${UUID.randomUUID()}"
}
/**
- * Deletes all large binaries from the bucket.
+ * Deletes all large binaries for one execution. Uses deleteDirectory,
which removes one
+ * listing page (<= 1000 objects) — enough for expected counts; more needs
a paginated delete.
*
- * @throws java.lang.Exception if the deletion fails
- * @return Unit
+ * @param executionId the execution whose large binaries should be removed
*/
- def deleteAllObjects(): Unit = {
+ def deleteByExecution(executionId: Long): Unit =
+ deleteByExecution(executionId, S3StorageClient.deleteDirectory)
+
+ /** Overload taking the delete op as a parameter. Visible for testing. */
+ private[util] def deleteByExecution(
+ executionId: Long,
+ deleteDir: (String, String) => Unit
+ ): Unit = {
try {
- S3StorageClient.deleteDirectory(DEFAULT_BUCKET, "objects")
- logger.info(s"Successfully deleted all large binaries from bucket:
$DEFAULT_BUCKET")
+ deleteDir(DEFAULT_BUCKET, executionPrefix(executionId))
+ logger.info(
+ s"Deleted large binaries for execution $executionId from bucket:
$DEFAULT_BUCKET"
+ )
} catch {
+ // Swallowed: cleanup is a side effect of deletion and must not fail it.
Logged at
+ // error because a failure here silently leaks storage.
case e: Exception =>
- logger.warn(s"Failed to delete large binaries from bucket:
$DEFAULT_BUCKET", e)
+ logger.error(
+ s"Failed to delete large binaries for execution $executionId " +
+ s"from bucket: $DEFAULT_BUCKET",
+ e
+ )
}
}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala
index 77d142efee..a0436e682e 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala
@@ -21,8 +21,22 @@ package org.apache.texera.service.util
import org.apache.texera.amber.core.tuple.LargeBinary
import org.scalatest.funsuite.AnyFunSuite
+import org.scalatest.BeforeAndAfterEach
-class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase {
+class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase with
BeforeAndAfterEach {
+
+ /** Execution id used by the bulk of the tests. */
+ private val TestExecutionId: Long = 9999L
+
+ /** Seeds the thread's base URI for an execution, as the controller does in
production. */
+ private def setExecutionContext(eid: Long): Unit =
+
LargeBinaryManager.setCurrentBaseUri(LargeBinaryManager.baseUriForExecution(eid))
+
+ /** Each test creates large binaries; they need a base URI on the thread. */
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ setExecutionContext(TestExecutionId)
+ }
/** Creates a large binary from string data and returns it. */
private def createLargeBinary(data: String): LargeBinary = {
@@ -54,7 +68,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(stream.readAllBytes().sameElements(data.getBytes))
stream.close()
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should read exact number of bytes") {
@@ -67,7 +81,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(result.sameElements("0123456789".getBytes))
stream.close()
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should handle reading more bytes than
available") {
@@ -81,7 +95,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(result.sameElements(data.getBytes))
stream.close()
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should support standard single-byte read") {
@@ -94,7 +108,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(stream.read() == -1) // EOF
stream.close()
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should return -1 at EOF") {
@@ -105,7 +119,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(stream.read() == -1)
stream.close()
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should throw exception when reading from closed
stream") {
@@ -117,7 +131,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assertThrows[java.io.IOException](stream.read())
assertThrows[java.io.IOException](stream.readAllBytes())
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should handle multiple close calls") {
@@ -127,7 +141,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
stream.close()
stream.close() // Should not throw
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should read large data correctly") {
@@ -145,7 +159,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(result.sameElements(largeData))
stream.close()
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
// ========================================
@@ -183,7 +197,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
}
}
- test("LargeBinaryManager should delete all large binaries") {
+ test("deleteByExecution removes binaries written via new LargeBinary()") {
val pointer1 = new LargeBinary()
val out1 = new LargeBinaryOutputStream(pointer1)
try {
@@ -200,18 +214,18 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
out2.close()
}
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryManager should handle delete with no objects gracefully") {
- LargeBinaryManager.deleteAllObjects() // Should not throw exception
+ LargeBinaryManager.deleteByExecution(TestExecutionId) // Should not throw
exception
}
- test("LargeBinaryManager should delete all objects") {
+ test("deleteByExecution removes multiple binaries for the execution") {
val pointer1 = createLargeBinary("Test data")
val pointer2 = createLargeBinary("Test data")
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryManager should create bucket if it doesn't exist") {
@@ -219,7 +233,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assertStandardBucket(pointer)
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryManager should handle large objects correctly") {
@@ -237,7 +251,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
stream.close()
assert(readData.sameElements(largeData))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryManager should generate unique URIs for different objects")
{
@@ -261,7 +275,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(pointer1.getUri != pointer2.getUri)
assert(pointer1.getObjectKey != pointer2.getObjectKey)
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream should handle multiple reads from the same
large binary") {
@@ -279,7 +293,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(readData1.sameElements(data.getBytes))
assert(readData2.sameElements(data.getBytes))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryManager should properly parse bucket name and object key
from large binary") {
@@ -289,7 +303,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(largeBinary.getObjectKey.nonEmpty)
assert(!largeBinary.getObjectKey.startsWith("/"))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
// ========================================
@@ -309,7 +323,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assertStandardBucket(largeBinary)
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryInputStream constructor should read large binary contents")
{
@@ -322,7 +336,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(readData.sameElements(data.getBytes))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryOutputStream and LargeBinaryInputStream should work
together end-to-end") {
@@ -344,7 +358,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(readData.sameElements(data.getBytes))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
// ========================================
@@ -368,7 +382,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(readData.sameElements(data.getBytes))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryOutputStream should create large binary") {
@@ -381,7 +395,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assertStandardBucket(largeBinary)
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryOutputStream should handle large data correctly") {
@@ -399,7 +413,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(readData.sameElements(largeData))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryOutputStream should handle multiple writes") {
@@ -416,7 +430,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(readData.sameElements("Hello World!".getBytes))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryOutputStream should throw exception when writing to closed
stream") {
@@ -427,7 +441,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assertThrows[java.io.IOException](outStream.write("more".getBytes))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinaryOutputStream should handle close() being called multiple
times") {
@@ -437,7 +451,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
outStream.close()
outStream.close() // Should not throw
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("New LargeBinary() constructor should create unique URIs") {
@@ -447,7 +461,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(largeBinary1.getUri != largeBinary2.getUri)
assert(largeBinary1.getObjectKey != largeBinary2.getObjectKey)
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
}
test("LargeBinary() and LargeBinaryOutputStream API should be symmetric with
input") {
@@ -466,6 +480,47 @@ class LargeBinaryManagerSpec extends AnyFunSuite with
S3StorageTestBase {
assert(readData.sameElements(data.getBytes))
- LargeBinaryManager.deleteAllObjects()
+ LargeBinaryManager.deleteByExecution(TestExecutionId)
+ }
+
+ test("deleteByExecution removes only the target execution's binaries") {
+ // Create one binary under execution 1001 and another under 1002.
+ setExecutionContext(1001L)
+ createLargeBinary("data for 1001")
+ setExecutionContext(1002L)
+ createLargeBinary("data for 1002")
+
+ // Delete only execution 1001's binaries.
+ LargeBinaryManager.deleteByExecution(1001L)
+
+ try {
+ assert(!S3StorageClient.directoryExists("texera-large-binaries",
"objects/1001"))
+ assert(S3StorageClient.directoryExists("texera-large-binaries",
"objects/1002"))
+ } finally {
+ // Keep the test self-contained: clean up 1002's objects and reset the
thread's
+ // base URI rather than relying on the next test's beforeEach.
+ LargeBinaryManager.deleteByExecution(1002L)
+ setExecutionContext(TestExecutionId)
+ }
+ }
+
+ test("deleteByExecution distinguishes executions whose ids share a numeric
prefix") {
+ // "objects/1" is a string prefix of "objects/11"; a prefix delete that
dropped the
+ // trailing "/" would wipe both. Pin that deleting execution 1 leaves 11
untouched.
+ setExecutionContext(1L)
+ createLargeBinary("data for execution 1")
+ setExecutionContext(11L)
+ createLargeBinary("data for execution 11")
+
+ LargeBinaryManager.deleteByExecution(1L)
+
+ try {
+ assert(!S3StorageClient.directoryExists("texera-large-binaries",
"objects/1"))
+ assert(S3StorageClient.directoryExists("texera-large-binaries",
"objects/11"))
+ } finally {
+ // Self-contained cleanup, mirroring the isolation test above.
+ LargeBinaryManager.deleteByExecution(11L)
+ setExecutionContext(TestExecutionId)
+ }
}
}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala
new file mode 100644
index 0000000000..d6268d29d3
--- /dev/null
+++
b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import org.scalatest.funsuite.AnyFunSuite
+
+/**
+ * Unit tests for [[LargeBinaryManager.deleteByExecution]] with the
directory-delete op
+ * injected, so success and the swallow-and-log error path run without a live
S3 endpoint.
+ */
+class LargeBinaryManagerUnitSpec extends AnyFunSuite {
+
+ test("deleteByExecution issues a delete scoped to the execution's object
prefix") {
+ var captured: Option[(String, String)] = None
+ LargeBinaryManager.deleteByExecution(
+ 42L,
+ (bucket, prefix) => captured = Some((bucket, prefix))
+ )
+ assert(captured.contains((LargeBinaryManager.DEFAULT_BUCKET,
"objects/42")))
+ }
+
+ test("deleteByExecution swallows exceptions raised by the underlying
delete") {
+ // The error path logs and returns; it must not propagate the failure to
callers.
+ LargeBinaryManager.deleteByExecution(7L, (_, _) => throw new
RuntimeException("boom"))
+ succeed
+ }
+
+ test("create returns a URI under the current thread's base URI") {
+ // create() reads a thread-local; run on a dedicated thread so the base
URI is
+ // isolated and does not leak into other tests.
+ @volatile var uri: String = ""
+ val thread = new Thread(() => {
+
LargeBinaryManager.setCurrentBaseUri(LargeBinaryManager.baseUriForExecution(555L))
+ uri = LargeBinaryManager.create()
+ })
+ thread.start()
+ thread.join()
+ val prefix = s"s3://${LargeBinaryManager.DEFAULT_BUCKET}/objects/555/"
+ assert(uri.startsWith(prefix))
+ // a unique (UUID) suffix follows the execution-scoped prefix
+ assert(uri.stripPrefix(prefix).nonEmpty)
+ }
+
+ test("create throws when no base URI is set on the thread") {
+ // A fresh thread starts with no base URI, so create() must fail fast.
+ @volatile var caught: Option[Throwable] = None
+ val thread = new Thread(() => {
+ try LargeBinaryManager.create()
+ catch { case e: Throwable => caught = Some(e) }
+ })
+ thread.start()
+ thread.join()
+ assert(caught.exists(_.isInstanceOf[IllegalStateException]))
+ }
+}