This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5280-75b46197d3584e0f211d5d3211bef571f5c61e9c in repository https://gitbox.apache.org/repos/asf/texera.git
commit 48e800e4458a74cc7bac5c41d8ac3e5eb0e2c7ea Author: Kunwoo (Chris) <[email protected]> AuthorDate: Sun Jun 7 21:43:57 2026 -0700 fix: scope large binary storage and cleanup by execution id (#5280) ### What changes were proposed in this PR? Large binaries were stored in the shared `texera-large-binaries` bucket under flat keys `objects/{timestamp}/{uuid}` with no execution id, and `clearExecutionResources(eid)` deleted all of them via `LargeBinaryManager.deleteAllObjects()`. Any cleanup for one execution therefore erased every other execution's (and user's) large binaries. This PR namespaces every large binary by its execution id and scopes deletion: - Object keys are now `objects/{eid}/{uuid}` on both the JVM and Python workers. - The execution-scoped location is named by the controller and handed to workers as data on `WorkerConfig` — no protobuf change. The controller computes the base URI `s3://{bucket}/objects/{eid}/`, and `create()` appends a unique suffix; the JVM seeds the base URI onto the data-processing thread at startup, and the Python worker receives it as a startup argument. The user-facing `largebinary()` / `new LargeBinary()` APIs are unchanged. - Cleanup uses the new `LargeBinaryManager.deleteByExecution(eid)` (prefix delete of `objects/{eid}/`). Both JVM and Python engines share the bucket and key shape, so this single JVM-side delete removes binaries created by both. - The `deleteAllObjects()` is removed. Pre-existing objects under the old `objects/{timestamp}/...` scheme are left untouched. ### Any related issues, documentation, discussions? Closes #4123. ### How was this PR tested? Import the following json file to create two workflows (You can configure the source operator to use any kinds of files you have), run them, and check if each execution creates 6 objects and one execution doesn't remove the other execution's large binary objects. [Large.Binary.Python (1).json](https://github.com/user-attachments/files/28369502/Large.Binary.Python.1.json) ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Anthropic), models Claude Opus 4.8, Claude Opus 4.7, and Claude Sonnet 4.6 --------- Signed-off-by: Kunwoo (Chris) <[email protected]> Co-authored-by: Xiaozhen Liu <[email protected]> --- .../main/python/core/models/type/large_binary.py | 4 +- .../src/main/python/core/storage/storage_config.py | 5 + .../pytexera/storage/large_binary_input_stream.py | 4 +- .../pytexera/storage/large_binary_manager.py | 110 ++++++++++------- .../pytexera/storage/large_binary_output_stream.py | 7 +- amber/src/main/python/texera_run_python_worker.py | 2 + .../pythonworker/PythonWorkflowWorker.scala | 3 +- .../scheduling/config/WorkerConfig.scala | 9 +- .../engine/architecture/worker/DPThread.scala | 10 +- .../architecture/worker/WorkflowWorker.scala | 8 +- .../dashboard/user/workflow/WorkflowResource.scala | 6 +- .../texera/web/service/WorkflowService.scala | 6 +- .../packaging/test_state_materialization_e2e.py | 1 + .../python/core/models/type/test_large_binary.py | 6 +- .../core/storage/iceberg/test_iceberg_document.py | 1 + .../storage/test_large_binary_input_stream.py | 5 +- .../pytexera/storage/test_large_binary_manager.py | 133 ++++++++++++--------- .../storage/test_large_binary_output_stream.py | 5 +- .../scheduling/config/SchedulingConfigsSpec.scala | 19 ++- .../engine/architecture/worker/DPThreadSpec.scala | 32 ++++- common/config/src/main/resources/storage.conf | 4 + .../apache/texera/amber/config/StorageConfig.scala | 1 + .../texera/service/util/LargeBinaryManager.scala | 86 +++++++++---- .../service/util/LargeBinaryManagerSpec.scala | 115 +++++++++++++----- .../service/util/LargeBinaryManagerUnitSpec.scala | 72 +++++++++++ 25 files changed, 482 insertions(+), 172 deletions(-) diff --git a/amber/src/main/python/core/models/type/large_binary.py b/amber/src/main/python/core/models/type/large_binary.py index 581a688912..34110f374e 100644 --- a/amber/src/main/python/core/models/type/large_binary.py +++ b/amber/src/main/python/core/models/type/large_binary.py @@ -63,9 +63,9 @@ class largebinary: """ if uri is None: # Lazy import to avoid circular dependencies - from pytexera.storage import large_binary_manager + from pytexera.storage.large_binary_manager import LargeBinaryManager - uri = large_binary_manager.create() + uri = LargeBinaryManager().create() if not uri.startswith("s3://"): raise ValueError(f"largebinary URI must start with 's3://', got: {uri}") diff --git a/amber/src/main/python/core/storage/storage_config.py b/amber/src/main/python/core/storage/storage_config.py index 8233590987..05e8c3ee16 100644 --- a/amber/src/main/python/core/storage/storage_config.py +++ b/amber/src/main/python/core/storage/storage_config.py @@ -41,6 +41,9 @@ class StorageConfig: S3_REGION = None S3_AUTH_USERNAME = None S3_AUTH_PASSWORD = None + # Execution-scoped base URI (s3://bucket/objects/{eid}/) for this worker's large + # binaries; fixed at process init, which assumes one process per execution. + S3_LARGE_BINARIES_BASE_URI = None @classmethod def initialize( @@ -59,6 +62,7 @@ class StorageConfig: s3_region, s3_auth_username, s3_auth_password, + s3_large_binaries_base_uri, ): if cls._initialized: raise RuntimeError( @@ -82,6 +86,7 @@ class StorageConfig: cls.S3_REGION = s3_region cls.S3_AUTH_USERNAME = s3_auth_username cls.S3_AUTH_PASSWORD = s3_auth_password + cls.S3_LARGE_BINARIES_BASE_URI = s3_large_binaries_base_uri cls._initialized = True diff --git a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py index 68368c5c12..f17373c449 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py +++ b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py @@ -60,9 +60,9 @@ class LargeBinaryInputStream(IOBase): def _lazy_init(self): """Download from S3 on first read operation.""" - from pytexera.storage import large_binary_manager + from pytexera.storage.large_binary_manager import LargeBinaryManager - s3 = large_binary_manager._get_s3_client() + s3 = LargeBinaryManager()._get_s3_client() response = s3.get_object( Bucket=self._large_binary.get_bucket_name(), Key=self._large_binary.get_object_key(), diff --git a/amber/src/main/python/pytexera/storage/large_binary_manager.py b/amber/src/main/python/pytexera/storage/large_binary_manager.py index e061eac622..b323c676d5 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_manager.py +++ b/amber/src/main/python/pytexera/storage/large_binary_manager.py @@ -22,57 +22,83 @@ Users should not interact with this module directly. Use largebinary() construct and LargeBinaryInputStream/LargeBinaryOutputStream instead. """ -import time +import threading import uuid from loguru import logger from core.storage.storage_config import StorageConfig -# Module-level state -_s3_client = None -DEFAULT_BUCKET = "texera-large-binaries" +class LargeBinaryManager: + """Manages large binaries in S3 for a worker process. -def _get_s3_client(): - """Get or initialize S3 client (lazy initialization, cached).""" - global _s3_client - if _s3_client is None: - try: - import boto3 - from botocore.config import Config - except ImportError as e: - raise RuntimeError("boto3 required. Install with: pip install boto3") from e + A singleton, so the cached S3 client is shared process-wide. create() appends a + unique suffix to an execution-scoped base URI handed down by the controller as + process config (``StorageConfig.S3_LARGE_BINARIES_BASE_URI``); the worker never + holds an execution id. This is the Python counterpart of the JVM + ``LargeBinaryManager``, which uses a thread-local instead because one JVM process + runs many workers across executions (a Python worker is one process per execution). + """ - _s3_client = boto3.client( - "s3", - endpoint_url=StorageConfig.S3_ENDPOINT, - aws_access_key_id=StorageConfig.S3_AUTH_USERNAME, - aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD, - region_name=StorageConfig.S3_REGION, - config=Config(signature_version="s3v4", s3={"addressing_style": "path"}), - ) - return _s3_client + _instance = None + # Guards singleton creation and S3-client init; reached from the operator and upload + # threads. + _lock = threading.Lock() + def __new__(cls): + # Double-checked locking: skip the lock once the instance exists. + if cls._instance is None: + with cls._lock: + if cls._instance is None: + instance = super().__new__(cls) + instance._s3_client = None + cls._instance = instance + return cls._instance -def _ensure_bucket_exists(bucket: str): - """Ensure S3 bucket exists, creating it if necessary.""" - s3 = _get_s3_client() - try: - s3.head_bucket(Bucket=bucket) - except s3.exceptions.NoSuchBucket: - logger.debug(f"Bucket {bucket} not found, creating it") - s3.create_bucket(Bucket=bucket) - logger.info(f"Created bucket: {bucket}") + def _get_s3_client(self): + """Get or initialize the S3 client (lazy initialization, cached).""" + if self._s3_client is None: + with self._lock: + if self._s3_client is None: + try: + import boto3 + from botocore.config import Config + except ImportError as e: + raise RuntimeError( + "boto3 required. Install with: pip install boto3" + ) from e + self._s3_client = boto3.client( + "s3", + endpoint_url=StorageConfig.S3_ENDPOINT, + aws_access_key_id=StorageConfig.S3_AUTH_USERNAME, + aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD, + region_name=StorageConfig.S3_REGION, + config=Config( + signature_version="s3v4", s3={"addressing_style": "path"} + ), + ) + return self._s3_client -def create() -> str: - """ - Creates a new largebinary reference with a unique S3 URI. + def _ensure_bucket_exists(self, bucket: str): + """Ensure the S3 bucket exists, creating it if necessary.""" + s3 = self._get_s3_client() + try: + s3.head_bucket(Bucket=bucket) + except s3.exceptions.NoSuchBucket: + logger.debug(f"Bucket {bucket} not found, creating it") + s3.create_bucket(Bucket=bucket) + logger.info(f"Created bucket: {bucket}") - Returns: - S3 URI string (format: s3://bucket/key) - """ - _ensure_bucket_exists(DEFAULT_BUCKET) - timestamp_ms = int(time.time() * 1000) - unique_id = uuid.uuid4() - object_key = f"objects/{timestamp_ms}/{unique_id}" - return f"s3://{DEFAULT_BUCKET}/{object_key}" + def create(self) -> str: + """Append a unique suffix to the controller-provided base URI. + + Pure string construction (no S3 round-trip); the bucket is created on demand at + upload time. Returns e.g. ``s3://bucket/objects/{execution_id}/{uuid}``. + """ + base_uri = StorageConfig.S3_LARGE_BINARIES_BASE_URI + if not base_uri: + raise RuntimeError( + "largebinary() requires a large-binaries base URI, but none is " + "configured (StorageConfig.S3_LARGE_BINARIES_BASE_URI is unset)." + ) + return f"{base_uri}{uuid.uuid4()}" diff --git a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py index 1ca575dd42..63d4caeb6b 100644 --- a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py +++ b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py @@ -29,7 +29,7 @@ Usage: from typing import Optional, Union from io import IOBase from core.models.type.large_binary import largebinary -from pytexera.storage import large_binary_manager +from pytexera.storage.large_binary_manager import LargeBinaryManager import threading import queue @@ -155,8 +155,9 @@ class LargeBinaryOutputStream(IOBase): def upload_worker(): s3 = None try: - large_binary_manager._ensure_bucket_exists(self._bucket_name) - s3 = large_binary_manager._get_s3_client() + manager = LargeBinaryManager() + manager._ensure_bucket_exists(self._bucket_name) + s3 = manager._get_s3_client() reader = _QueueReader(self._queue) s3.upload_fileobj(reader, self._bucket_name, self._object_key) except Exception as e: diff --git a/amber/src/main/python/texera_run_python_worker.py b/amber/src/main/python/texera_run_python_worker.py index f1ba8c7db6..5c3e25e096 100644 --- a/amber/src/main/python/texera_run_python_worker.py +++ b/amber/src/main/python/texera_run_python_worker.py @@ -70,6 +70,7 @@ if __name__ == "__main__": s3_region, s3_auth_username, s3_auth_password, + s3_large_binaries_base_uri, ) = sys.argv init_loguru_logger(logger_level) StorageConfig.initialize( @@ -87,6 +88,7 @@ if __name__ == "__main__": s3_region, s3_auth_username, s3_auth_password, + s3_large_binaries_base_uri, ) # Setting R_HOME environment variable for R-UDF usage diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala index b8bb3a92b6..c7ed3b6324 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala @@ -209,7 +209,8 @@ class PythonWorkflowWorker( StorageConfig.s3Endpoint, StorageConfig.s3Region, StorageConfig.s3Username, - StorageConfig.s3Password + StorageConfig.s3Password, + workerConfig.largeBinaryBaseUri ) ).run(BasicIO.standard(false)) } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala index bb8f3d5775..8166dee6dd 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/WorkerConfig.scala @@ -23,6 +23,7 @@ import org.apache.texera.amber.config.ApplicationConfig import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.texera.amber.core.workflow.PhysicalOp import org.apache.texera.amber.util.VirtualIdentityUtils +import org.apache.texera.service.util.LargeBinaryManager case object WorkerConfig { def generateWorkerConfigs( @@ -45,7 +46,8 @@ case object WorkerConfig { WorkerConfig( VirtualIdentityUtils.createWorkerIdentity(physicalOp.workflowId, physicalOp.id, idx), pveName = physicalOp.pveName, - cuid = cuid + cuid = cuid, + largeBinaryBaseUri = LargeBinaryManager.baseUriForExecution(physicalOp.executionId.id) ) ) } @@ -54,5 +56,8 @@ case object WorkerConfig { case class WorkerConfig( workerId: ActorVirtualIdentity, pveName: String = "", - cuid: Option[Int] = None + cuid: Option[Int] = None, + // Controller-named, execution-scoped base URI under which this worker's large binaries + // live; create() appends a unique suffix. Empty when large binaries are unconfigured. + largeBinaryBaseUri: String = "" ) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala index 0c136d613a..fd4d7786c8 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DPThread.scala @@ -39,6 +39,7 @@ import org.apache.texera.amber.engine.common.ambermessage.{ } import org.apache.texera.amber.engine.common.virtualidentity.util.SELF import org.apache.texera.amber.error.ErrorUtils.safely +import org.apache.texera.service.util.LargeBinaryManager import java.util.concurrent._ @@ -46,7 +47,10 @@ class DPThread( val actorId: ActorVirtualIdentity, dp: DataProcessor, logManager: ReplayLogManager, - internalQueue: LinkedBlockingQueue[DPInputQueueElement] + internalQueue: LinkedBlockingQueue[DPInputQueueElement], + // Controller-named, execution-scoped base URI for large binaries created on this + // thread (empty when unconfigured). Seeded into LargeBinaryManager at thread start. + largeBinaryBaseUri: String = "" ) extends AmberLogging { // initialize dp thread upon construction @@ -91,6 +95,10 @@ class DPThread( dpThread = dpThreadExecutor.submit(new Runnable() { def run(): Unit = { Thread.currentThread().setName(getThreadName) + // Seed this thread's large-binary base URI (from WorkerConfig) before any tuple, + // so create() can append a suffix without the execution id. Once per thread, + // assuming a thread serves one execution. + LargeBinaryManager.setCurrentBaseUri(largeBinaryBaseUri) logger.info("DP thread started") startFuture.complete(()) dp.statisticsManager.initializeWorkerStartTime(System.nanoTime()) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala index d1a0a300d9..13b1d94b04 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala @@ -107,7 +107,13 @@ class WorkflowWorker( ) } // dp is ready - dpThread = new DPThread(workerConfig.workerId, dp, logManager, inputQueue) + dpThread = new DPThread( + workerConfig.workerId, + dp, + logManager, + inputQueue, + workerConfig.largeBinaryBaseUri + ) dpThread.start() } diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala index cb910d11c3..ee559048c9 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala @@ -611,8 +611,6 @@ class WorkflowResource extends LazyLogging { .asScala .toList - LargeBinaryManager.deleteAllObjects() - // Collect all URIs related to executions for cleanup val uris = eids.flatMap { eid => val executionId = ExecutionIdentity(eid.longValue()) @@ -638,6 +636,10 @@ class WorkflowResource extends LazyLogging { } } + // Delete large binaries for each execution belonging to the workflows being + // removed. Done after the transaction (like the document cleanup below). + eids.foreach(eid => LargeBinaryManager.deleteByExecution(eid.longValue())) + // Clean up document storage try { uris.foreach { uri => diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala index 0044c95967..c18b8b50e8 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala @@ -318,7 +318,7 @@ class WorkflowService( * 2. Clears URI references from the execution registry * 3. Safely clears all result and console message documents * 4. Expires Iceberg snapshots for runtime statistics - * 5. Deletes large binaries from MinIO + * 5. Deletes this execution's large binaries from MinIO * * @param eid The execution identity to clean up resources for */ @@ -355,7 +355,7 @@ class WorkflowService( logger.debug(s"Error processing document at $uri: ${error.getMessage}") } } - // Delete large binaries - LargeBinaryManager.deleteAllObjects() + // Delete this execution's large binaries + LargeBinaryManager.deleteByExecution(eid.id) } } diff --git a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py index 9d1fd30698..cfc4f7f676 100644 --- a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py +++ b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py @@ -108,6 +108,7 @@ def sqlite_iceberg_catalog(): s3_region="unused", s3_auth_username="unused", s3_auth_password="unused", + s3_large_binaries_base_uri="s3://texera-large-binaries/objects/0/", ) original_instance = IcebergCatalogInstance._instance diff --git a/amber/src/test/python/core/models/type/test_large_binary.py b/amber/src/test/python/core/models/type/test_large_binary.py index 36310e1dd5..56348f4265 100644 --- a/amber/src/test/python/core/models/type/test_large_binary.py +++ b/amber/src/test/python/core/models/type/test_large_binary.py @@ -18,6 +18,10 @@ import pytest from unittest.mock import patch from core.models.type.large_binary import largebinary +from pytexera.storage.large_binary_manager import LargeBinaryManager + +# The manager is a singleton; bind the shared instance for the tests. +large_binary_manager = LargeBinaryManager() class TestLargeBinary: @@ -31,7 +35,7 @@ class TestLargeBinary: def test_create_without_uri(self): """Test creating largebinary without URI (calls large_binary_manager.create).""" - with patch("pytexera.storage.large_binary_manager.create") as mock_create: + with patch.object(large_binary_manager, "create") as mock_create: mock_create.return_value = "s3://bucket/objects/123/uuid" large_binary = largebinary() assert large_binary.uri == "s3://bucket/objects/123/uuid" diff --git a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py index bdba00ba0c..5e61b4ed68 100644 --- a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py +++ b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py @@ -57,6 +57,7 @@ StorageConfig.initialize( s3_region="us-east-1", s3_auth_username="minioadmin", s3_auth_password="minioadmin", + s3_large_binaries_base_uri="s3://texera-large-binaries/objects/0/", ) diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py b/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py index 35bc5bc634..7e6a401aef 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_input_stream.py @@ -20,7 +20,10 @@ from unittest.mock import patch, MagicMock from io import BytesIO from core.models.type.large_binary import largebinary from pytexera.storage.large_binary_input_stream import LargeBinaryInputStream -from pytexera.storage import large_binary_manager +from pytexera.storage.large_binary_manager import LargeBinaryManager + +# The manager is a singleton; bind the shared instance for the tests. +large_binary_manager = LargeBinaryManager() class TestLargeBinaryInputStream: diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py index 1942e91f8b..e1da5eee18 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_manager.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_manager.py @@ -15,34 +15,44 @@ # specific language governing permissions and limitations # under the License. +import re + import pytest from unittest.mock import patch, MagicMock -from pytexera.storage import large_binary_manager +from pytexera.storage.large_binary_manager import LargeBinaryManager from core.storage.storage_config import StorageConfig +# The manager is a singleton; bind the shared instance for the tests. +large_binary_manager = LargeBinaryManager() + +# Execution-scoped base URI the controller hands down; create() appends a unique suffix. +TEST_BASE_URI = "s3://texera-large-binaries/objects/1/" + + [email protected](autouse=True) +def _init_storage_config(): + """Initialize StorageConfig (incl. the large-binaries base URI) for every test.""" + if not StorageConfig._initialized: + StorageConfig.initialize( + catalog_type="postgres", + postgres_uri_without_scheme="localhost:5432/test", + postgres_username="test", + postgres_password="test", + rest_catalog_uri="http://localhost:8181/catalog/", + rest_catalog_warehouse_name="texera", + table_result_namespace="test", + table_state_namespace="test-state", + directory_path="/tmp/test", + commit_batch_size=1000, + s3_endpoint="http://localhost:9000", + s3_region="us-east-1", + s3_auth_username="minioadmin", + s3_auth_password="minioadmin", + s3_large_binaries_base_uri=TEST_BASE_URI, + ) -class TestLargeBinaryManager: - @pytest.fixture(autouse=True) - def setup_storage_config(self): - """Initialize StorageConfig for tests.""" - if not StorageConfig._initialized: - StorageConfig.initialize( - catalog_type="postgres", - postgres_uri_without_scheme="localhost:5432/test", - postgres_username="test", - postgres_password="test", - rest_catalog_uri="http://localhost:8181/catalog/", - rest_catalog_warehouse_name="texera", - table_result_namespace="test", - table_state_namespace="test-state", - directory_path="/tmp/test", - commit_batch_size=1000, - s3_endpoint="http://localhost:9000", - s3_region="us-east-1", - s3_auth_username="minioadmin", - s3_auth_password="minioadmin", - ) +class TestLargeBinaryManager: def test_get_s3_client_initializes_once(self): """Test that S3 client is initialized and cached.""" # Reset the client @@ -118,37 +128,48 @@ class TestLargeBinaryManager: mock_client.head_bucket.assert_called_once_with(Bucket="test-bucket") mock_client.create_bucket.assert_called_once_with(Bucket="test-bucket") - def test_create_generates_unique_uri(self): - """Test that create() generates a unique S3 URI.""" - large_binary_manager._s3_client = None - - with patch("boto3.client") as mock_boto3_client: - mock_client = MagicMock() - mock_boto3_client.return_value = mock_client - mock_client.head_bucket.return_value = None - mock_client.exceptions.NoSuchBucket = type("NoSuchBucket", (Exception,), {}) - - uri = large_binary_manager.create() - - # Check URI format - assert uri.startswith("s3://") - assert uri.startswith(f"s3://{large_binary_manager.DEFAULT_BUCKET}/") - assert "objects/" in uri - - # Verify bucket was checked/created - mock_client.head_bucket.assert_called_once_with( - Bucket=large_binary_manager.DEFAULT_BUCKET - ) - - def test_create_uses_default_bucket(self): - """Test that create() uses the default bucket.""" - large_binary_manager._s3_client = None - - with patch("boto3.client") as mock_boto3_client: - mock_client = MagicMock() - mock_boto3_client.return_value = mock_client - mock_client.head_bucket.return_value = None - mock_client.exceptions.NoSuchBucket = type("NoSuchBucket", (Exception,), {}) - - uri = large_binary_manager.create() - assert large_binary_manager.DEFAULT_BUCKET in uri + def test_create_appends_unique_suffix_to_base_uri(self): + """create() returns the configured base URI plus a unique suffix (no S3 call).""" + base = StorageConfig.S3_LARGE_BINARIES_BASE_URI + + uri1 = large_binary_manager.create() + uri2 = large_binary_manager.create() + + assert uri1.startswith(base) + assert uri2.startswith(base) + # A non-empty, unique suffix follows the base URI. + assert uri1 != base + assert uri1 != uri2 + + +def test_create_matches_execution_scoped_key_shape(monkeypatch): + # The base URI is execution-scoped (controller-named); create() only appends a uuid. + monkeypatch.setattr( + StorageConfig, + "S3_LARGE_BINARIES_BASE_URI", + "s3://texera-large-binaries/objects/42/", + ) + uri = large_binary_manager.create() + assert re.fullmatch(r"s3://texera-large-binaries/objects/42/[0-9a-fA-F-]+", uri) + + +def test_create_without_base_uri_raises(monkeypatch): + # An unconfigured base URI should fail with a clear error, not a cryptic S3 one. + monkeypatch.setattr(StorageConfig, "S3_LARGE_BINARIES_BASE_URI", None) + with pytest.raises(RuntimeError): + large_binary_manager.create() + + +def test_largebinarymanager_is_a_singleton(): + # Constructing the manager always returns the same shared instance. + assert LargeBinaryManager() is LargeBinaryManager() + + # State (the cached S3 client) is shared across handles (same instance). + mgr = LargeBinaryManager() + original = mgr._s3_client + sentinel = object() + mgr._s3_client = sentinel + try: + assert LargeBinaryManager()._s3_client is sentinel + finally: + mgr._s3_client = original diff --git a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py index e35f1400fd..dc8881ab16 100644 --- a/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py +++ b/amber/src/test/python/pytexera/storage/test_large_binary_output_stream.py @@ -24,7 +24,10 @@ from pytexera.storage.large_binary_output_stream import ( LargeBinaryOutputStream, _QueueReader, ) -from pytexera.storage import large_binary_manager +from pytexera.storage.large_binary_manager import LargeBinaryManager + +# The manager is a singleton; bind the shared instance for the tests. +large_binary_manager = LargeBinaryManager() class TestLargeBinaryOutputStream: diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala index 25ef739251..31ab5c8e1a 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala @@ -31,6 +31,7 @@ import org.apache.texera.amber.core.virtualidentity.{ } import org.apache.texera.amber.core.workflow._ import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._ +import org.apache.texera.service.util.LargeBinaryManager import org.scalatest.flatspec.AnyFlatSpec import java.net.URI @@ -273,11 +274,15 @@ class SchedulingConfigsSpec extends AnyFlatSpec { // WorkerConfig.generateWorkerConfigs // --------------------------------------------------------------------------- - private def physicalOp(parallelizable: Boolean, suggested: Option[Int]): PhysicalOp = + private def physicalOp( + parallelizable: Boolean, + suggested: Option[Int], + executionId: ExecutionIdentity = ExecutionIdentity(0) + ): PhysicalOp = PhysicalOp( PhysicalOpIdentity(OperatorIdentity("op"), "main"), WorkflowIdentity(0), - ExecutionIdentity(0), + executionId, OpExecInitInfo.Empty, parallelizable = parallelizable, suggestedWorkerNum = suggested @@ -308,4 +313,14 @@ class SchedulingConfigsSpec extends AnyFlatSpec { WorkerConfig.generateWorkerConfigs(physicalOp(parallelizable = true, suggested = None)) assert(configs.size == ApplicationConfig.numWorkerPerOperatorByDefault) } + + it should "set largeBinaryBaseUri to the execution-scoped base URI for every worker" in { + val eid = ExecutionIdentity(42L) + val configs = WorkerConfig.generateWorkerConfigs( + physicalOp(parallelizable = true, suggested = Some(3), executionId = eid) + ) + val expected = LargeBinaryManager.baseUriForExecution(eid.id) + assert(expected.contains(s"objects/${eid.id}/")) + assert(configs.forall(_.largeBinaryBaseUri == expected)) + } } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala index d8b5d57d63..eb1241182e 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/DPThreadSpec.scala @@ -42,11 +42,12 @@ import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFI import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient.ControlInvocation import org.apache.texera.amber.engine.common.storage.SequentialRecordStorage import org.apache.texera.amber.engine.common.virtualidentity.util.SELF +import org.apache.texera.service.util.LargeBinaryManager import org.scalamock.scalatest.MockFactory import org.scalatest.flatspec.AnyFlatSpec import java.net.URI -import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.{CompletableFuture, LinkedBlockingQueue, TimeUnit} class DPThreadSpec extends AnyFlatSpec with MockFactory { @@ -240,4 +241,33 @@ class DPThreadSpec extends AnyFlatSpec with MockFactory { assert(logs.length > 1) } + "DP Thread" should "seed the base URI so create() yields execution-scoped keys" in { + val eid = 7777L + val baseUri = LargeBinaryManager.baseUriForExecution(eid) + // create() runs on the DP thread; capture what it produces there. + val capturedUri = new CompletableFuture[String]() + val inputQueue = new LinkedBlockingQueue[DPInputQueueElement]() + val dp = new DataProcessor(workerId, x => {}, inputMessageQueue = inputQueue) + dp.executor = new OperatorExecutor { + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = { + capturedUri.complete(LargeBinaryManager.create()) + Iterator.empty + } + } + dp.inputManager.addPort(mockInputPortId, schema, List.empty, List.empty) + dp.inputGateway.getChannel(dataChannelId).setPortId(mockInputPortId) + dp.adaptiveBatchingMonitor = mock[WorkerTimerService] + (dp.adaptiveBatchingMonitor.resumeAdaptiveBatching _).expects().anyNumberOfTimes() + val dpThread = new DPThread(workerId, dp, logManager, inputQueue, baseUri) + dpThread.start() + inputQueue.put( + FIFOMessageElement(WorkflowFIFOMessage(dataChannelId, 0, DataFrame(Array(tuples(0))))) + ) + + val uri = capturedUri.get(5, TimeUnit.SECONDS) + assert(uri.startsWith(s"s3://${LargeBinaryManager.DEFAULT_BUCKET}/objects/$eid/")) + // a unique suffix is appended to the execution-scoped base URI + assert(uri.length > baseUri.length) + } + } diff --git a/common/config/src/main/resources/storage.conf b/common/config/src/main/resources/storage.conf index 0682109f19..12a9919e04 100644 --- a/common/config/src/main/resources/storage.conf +++ b/common/config/src/main/resources/storage.conf @@ -115,6 +115,10 @@ storage { region = "us-west-2" region = ${?STORAGE_S3_REGION} + # Bucket for large binaries; shared by the JVM and Python workers and cleanup. + large-binaries-bucket = "texera-large-binaries" + large-binaries-bucket = ${?STORAGE_S3_LARGE_BINARIES_BUCKET} + multipart { part-size = "16MB" part-size = ${?STORAGE_S3_MULTIPART_PART_SIZE} diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala index 07447cfdbe..c88541cf1b 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala @@ -80,6 +80,7 @@ object StorageConfig { val s3Region: String = conf.getString("storage.s3.region") val s3Username: String = conf.getString("storage.s3.auth.username") val s3Password: String = conf.getString("storage.s3.auth.password") + val s3LargeBinariesBucket: String = conf.getString("storage.s3.large-binaries-bucket") val s3MultipartUploadPartSize: Long = parseSizeStringToBytes( conf.getString("storage.s3.multipart.part-size") ) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala index 44db3929f2..2fa4acb530 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala @@ -20,44 +20,88 @@ package org.apache.texera.service.util import com.typesafe.scalalogging.LazyLogging +import org.apache.texera.amber.config.StorageConfig import java.util.UUID -/** - * Manages the lifecycle of LargeBinaries stored in S3. - * - * Handles creation and deletion of large objects that exceed - * normal tuple size limits. - */ +/** Manages the lifecycle of LargeBinaries (objects too large for normal tuples) in S3. */ object LargeBinaryManager extends LazyLogging { - val DEFAULT_BUCKET: String = "texera-large-binaries" + // From config so the JVM/Python workers and cleanup all share one bucket. + val DEFAULT_BUCKET: String = StorageConfig.s3LargeBinariesBucket + + /** Per-execution key prefix; the single source of truth for the write and delete paths. */ + private def executionPrefix(executionId: Long): String = s"objects/$executionId" /** - * Creates a new LargeBinary reference. - * The actual data upload happens separately via LargeBinaryOutputStream. + * Base URI (trailing slash) under which `executionId`'s large binaries live; create() + * appends a unique suffix. Empty when the bucket is unconfigured, so create() fails loudly. * - * @return S3 URI string for the new LargeBinary (format: s3://bucket/key) + * `executionId` must be a persisted EID. The sentinel id (DEFAULT_EXECUTION_ID = 1) shares + * this space, so binaries must only be created under a real execution — else execution 1 + * and a default-context run would collide under objects/1/. */ - def create(): String = { - val objectKey = s"objects/${System.currentTimeMillis()}/${UUID.randomUUID()}" - val uri = s"s3://$DEFAULT_BUCKET/$objectKey" + def baseUriForExecution(executionId: Long): String = + if (DEFAULT_BUCKET.isEmpty) "" + else s"s3://$DEFAULT_BUCKET/${executionPrefix(executionId)}/" + + /** + * Base URI for binaries created on the current thread — thread-local because create() + * runs on the DP thread. Seeded once at DP-thread start, assuming a thread serves one + * execution; re-seed if workers are ever reused across executions. + */ + private val currentBaseUri: ThreadLocal[Option[String]] = + ThreadLocal.withInitial(() => Option.empty[String]) + + /** Sets the current thread's base URI; an empty value clears it, so create() fails loudly. */ + def setCurrentBaseUri(baseUri: String): Unit = + currentBaseUri.set(Option(baseUri).filter(_.nonEmpty)) - uri + /** + * Creates a LargeBinary reference by appending a unique suffix to the current thread's + * base URI. Data is uploaded separately via LargeBinaryOutputStream. + * + * @return e.g. s3://bucket/objects/{eid}/{uuid} + */ + def create(): String = { + val baseUri = currentBaseUri + .get() + .getOrElse( + throw new IllegalStateException( + "LargeBinaryManager.create() requires a base URI, " + + "but none was set on the current thread." + ) + ) + s"$baseUri${UUID.randomUUID()}" } /** - * Deletes all large binaries from the bucket. + * Deletes all large binaries for one execution. Uses deleteDirectory, which removes one + * listing page (<= 1000 objects) — enough for expected counts; more needs a paginated delete. * - * @throws java.lang.Exception if the deletion fails - * @return Unit + * @param executionId the execution whose large binaries should be removed */ - def deleteAllObjects(): Unit = { + def deleteByExecution(executionId: Long): Unit = + deleteByExecution(executionId, S3StorageClient.deleteDirectory) + + /** Overload taking the delete op as a parameter. Visible for testing. */ + private[util] def deleteByExecution( + executionId: Long, + deleteDir: (String, String) => Unit + ): Unit = { try { - S3StorageClient.deleteDirectory(DEFAULT_BUCKET, "objects") - logger.info(s"Successfully deleted all large binaries from bucket: $DEFAULT_BUCKET") + deleteDir(DEFAULT_BUCKET, executionPrefix(executionId)) + logger.info( + s"Deleted large binaries for execution $executionId from bucket: $DEFAULT_BUCKET" + ) } catch { + // Swallowed: cleanup is a side effect of deletion and must not fail it. Logged at + // error because a failure here silently leaks storage. case e: Exception => - logger.warn(s"Failed to delete large binaries from bucket: $DEFAULT_BUCKET", e) + logger.error( + s"Failed to delete large binaries for execution $executionId " + + s"from bucket: $DEFAULT_BUCKET", + e + ) } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala index 77d142efee..a0436e682e 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerSpec.scala @@ -21,8 +21,22 @@ package org.apache.texera.service.util import org.apache.texera.amber.core.tuple.LargeBinary import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.BeforeAndAfterEach -class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { +class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase with BeforeAndAfterEach { + + /** Execution id used by the bulk of the tests. */ + private val TestExecutionId: Long = 9999L + + /** Seeds the thread's base URI for an execution, as the controller does in production. */ + private def setExecutionContext(eid: Long): Unit = + LargeBinaryManager.setCurrentBaseUri(LargeBinaryManager.baseUriForExecution(eid)) + + /** Each test creates large binaries; they need a base URI on the thread. */ + override def beforeEach(): Unit = { + super.beforeEach() + setExecutionContext(TestExecutionId) + } /** Creates a large binary from string data and returns it. */ private def createLargeBinary(data: String): LargeBinary = { @@ -54,7 +68,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(stream.readAllBytes().sameElements(data.getBytes)) stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should read exact number of bytes") { @@ -67,7 +81,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(result.sameElements("0123456789".getBytes)) stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should handle reading more bytes than available") { @@ -81,7 +95,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(result.sameElements(data.getBytes)) stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should support standard single-byte read") { @@ -94,7 +108,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(stream.read() == -1) // EOF stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should return -1 at EOF") { @@ -105,7 +119,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(stream.read() == -1) stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should throw exception when reading from closed stream") { @@ -117,7 +131,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assertThrows[java.io.IOException](stream.read()) assertThrows[java.io.IOException](stream.readAllBytes()) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should handle multiple close calls") { @@ -127,7 +141,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { stream.close() stream.close() // Should not throw - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should read large data correctly") { @@ -145,7 +159,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(result.sameElements(largeData)) stream.close() - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } // ======================================== @@ -183,7 +197,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { } } - test("LargeBinaryManager should delete all large binaries") { + test("deleteByExecution removes binaries written via new LargeBinary()") { val pointer1 = new LargeBinary() val out1 = new LargeBinaryOutputStream(pointer1) try { @@ -200,18 +214,18 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { out2.close() } - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryManager should handle delete with no objects gracefully") { - LargeBinaryManager.deleteAllObjects() // Should not throw exception + LargeBinaryManager.deleteByExecution(TestExecutionId) // Should not throw exception } - test("LargeBinaryManager should delete all objects") { + test("deleteByExecution removes multiple binaries for the execution") { val pointer1 = createLargeBinary("Test data") val pointer2 = createLargeBinary("Test data") - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryManager should create bucket if it doesn't exist") { @@ -219,7 +233,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assertStandardBucket(pointer) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryManager should handle large objects correctly") { @@ -237,7 +251,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { stream.close() assert(readData.sameElements(largeData)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryManager should generate unique URIs for different objects") { @@ -261,7 +275,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(pointer1.getUri != pointer2.getUri) assert(pointer1.getObjectKey != pointer2.getObjectKey) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream should handle multiple reads from the same large binary") { @@ -279,7 +293,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData1.sameElements(data.getBytes)) assert(readData2.sameElements(data.getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryManager should properly parse bucket name and object key from large binary") { @@ -289,7 +303,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(largeBinary.getObjectKey.nonEmpty) assert(!largeBinary.getObjectKey.startsWith("/")) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } // ======================================== @@ -309,7 +323,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assertStandardBucket(largeBinary) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryInputStream constructor should read large binary contents") { @@ -322,7 +336,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements(data.getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream and LargeBinaryInputStream should work together end-to-end") { @@ -344,7 +358,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements(data.getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } // ======================================== @@ -368,7 +382,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements(data.getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream should create large binary") { @@ -381,7 +395,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assertStandardBucket(largeBinary) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream should handle large data correctly") { @@ -399,7 +413,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements(largeData)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream should handle multiple writes") { @@ -416,7 +430,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements("Hello World!".getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream should throw exception when writing to closed stream") { @@ -427,7 +441,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assertThrows[java.io.IOException](outStream.write("more".getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinaryOutputStream should handle close() being called multiple times") { @@ -437,7 +451,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { outStream.close() outStream.close() // Should not throw - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("New LargeBinary() constructor should create unique URIs") { @@ -447,7 +461,7 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(largeBinary1.getUri != largeBinary2.getUri) assert(largeBinary1.getObjectKey != largeBinary2.getObjectKey) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) } test("LargeBinary() and LargeBinaryOutputStream API should be symmetric with input") { @@ -466,6 +480,47 @@ class LargeBinaryManagerSpec extends AnyFunSuite with S3StorageTestBase { assert(readData.sameElements(data.getBytes)) - LargeBinaryManager.deleteAllObjects() + LargeBinaryManager.deleteByExecution(TestExecutionId) + } + + test("deleteByExecution removes only the target execution's binaries") { + // Create one binary under execution 1001 and another under 1002. + setExecutionContext(1001L) + createLargeBinary("data for 1001") + setExecutionContext(1002L) + createLargeBinary("data for 1002") + + // Delete only execution 1001's binaries. + LargeBinaryManager.deleteByExecution(1001L) + + try { + assert(!S3StorageClient.directoryExists("texera-large-binaries", "objects/1001")) + assert(S3StorageClient.directoryExists("texera-large-binaries", "objects/1002")) + } finally { + // Keep the test self-contained: clean up 1002's objects and reset the thread's + // base URI rather than relying on the next test's beforeEach. + LargeBinaryManager.deleteByExecution(1002L) + setExecutionContext(TestExecutionId) + } + } + + test("deleteByExecution distinguishes executions whose ids share a numeric prefix") { + // "objects/1" is a string prefix of "objects/11"; a prefix delete that dropped the + // trailing "/" would wipe both. Pin that deleting execution 1 leaves 11 untouched. + setExecutionContext(1L) + createLargeBinary("data for execution 1") + setExecutionContext(11L) + createLargeBinary("data for execution 11") + + LargeBinaryManager.deleteByExecution(1L) + + try { + assert(!S3StorageClient.directoryExists("texera-large-binaries", "objects/1")) + assert(S3StorageClient.directoryExists("texera-large-binaries", "objects/11")) + } finally { + // Self-contained cleanup, mirroring the isolation test above. + LargeBinaryManager.deleteByExecution(11L) + setExecutionContext(TestExecutionId) + } } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala new file mode 100644 index 0000000000..d6268d29d3 --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/LargeBinaryManagerUnitSpec.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import org.scalatest.funsuite.AnyFunSuite + +/** + * Unit tests for [[LargeBinaryManager.deleteByExecution]] with the directory-delete op + * injected, so success and the swallow-and-log error path run without a live S3 endpoint. + */ +class LargeBinaryManagerUnitSpec extends AnyFunSuite { + + test("deleteByExecution issues a delete scoped to the execution's object prefix") { + var captured: Option[(String, String)] = None + LargeBinaryManager.deleteByExecution( + 42L, + (bucket, prefix) => captured = Some((bucket, prefix)) + ) + assert(captured.contains((LargeBinaryManager.DEFAULT_BUCKET, "objects/42"))) + } + + test("deleteByExecution swallows exceptions raised by the underlying delete") { + // The error path logs and returns; it must not propagate the failure to callers. + LargeBinaryManager.deleteByExecution(7L, (_, _) => throw new RuntimeException("boom")) + succeed + } + + test("create returns a URI under the current thread's base URI") { + // create() reads a thread-local; run on a dedicated thread so the base URI is + // isolated and does not leak into other tests. + @volatile var uri: String = "" + val thread = new Thread(() => { + LargeBinaryManager.setCurrentBaseUri(LargeBinaryManager.baseUriForExecution(555L)) + uri = LargeBinaryManager.create() + }) + thread.start() + thread.join() + val prefix = s"s3://${LargeBinaryManager.DEFAULT_BUCKET}/objects/555/" + assert(uri.startsWith(prefix)) + // a unique (UUID) suffix follows the execution-scoped prefix + assert(uri.stripPrefix(prefix).nonEmpty) + } + + test("create throws when no base URI is set on the thread") { + // A fresh thread starts with no base URI, so create() must fail fast. + @volatile var caught: Option[Throwable] = None + val thread = new Thread(() => { + try LargeBinaryManager.create() + catch { case e: Throwable => caught = Some(e) } + }) + thread.start() + thread.join() + assert(caught.exists(_.isInstanceOf[IllegalStateException])) + } +}
