This is an automated email from the ASF dual-hosted git repository. juergbi pushed a commit to branch juerg/buildbox-asset-remote in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit e1fbeaebff873c9fc7f29585723ce041fe2dd901 Author: Jürg Billeter <[email protected]> AuthorDate: Fri May 31 12:52:16 2024 +0200 Move `CASDProcessManager` instance from `CASCache` to `Context` --- src/buildstream/_cas/__init__.py | 1 + src/buildstream/_cas/cascache.py | 37 ++++----------------- src/buildstream/_cas/casdprocessmanager.py | 2 ++ src/buildstream/_context.py | 32 ++++++++++++------ src/buildstream/_testing/runcli.py | 2 +- tests/artifactcache/expiry.py | 8 ++--- tests/internals/cascache.py | 27 +++++++-------- tests/internals/storage.py | 8 ++--- tests/internals/storage_vdir_import.py | 38 +++++----------------- tests/testutils/__init__.py | 1 + tests/testutils/artifactshare.py | 2 +- .../_cas/__init__.py => tests/testutils/casd.py | 24 +++++++++++--- 12 files changed, 78 insertions(+), 104 deletions(-) diff --git a/src/buildstream/_cas/__init__.py b/src/buildstream/_cas/__init__.py index 5f9c4abb1..52f425d56 100644 --- a/src/buildstream/_cas/__init__.py +++ b/src/buildstream/_cas/__init__.py @@ -15,4 +15,5 @@ # Tristan Van Berkom <[email protected]> from .cascache import CASCache, CASLogLevel +from .casdprocessmanager import CASDProcessManager from .casremote import CASRemote diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index 8110337bc..627778c16 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -32,7 +32,6 @@ from .. import utils from ..types import FastEnum, SourceRef from .._exceptions import CASCacheError -from .casdprocessmanager import CASDProcessManager from .casremote import CASRemote, _CASBatchRead, _CASBatchUpdate, BlobNotFound _BUFFER_SIZE = 65536 @@ -52,25 +51,11 @@ class CASLogLevel(FastEnum): # # Args: # path (str): The root directory for the CAS repository -# casd (bool): True to spawn buildbox-casd (default) or False (testing only) -# cache_quota (int): User configured cache quota -# protect_session_blobs (bool): Disable expiry for blobs used in the current session -# log_level (LogLevel): Log level to give to buildbox-casd for logging -# log_directory (str): the root of the directory in which to store logs +# casd (CASDProcessManager): The buildbox-casd manager +# remote_cache (bool): True if a CAS server is configured as a remote cache (storage-service) # class CASCache: - def __init__( - self, - path, - *, - casd=True, - cache_quota=None, - remote_cache_spec=None, - protect_session_blobs=True, - log_level=CASLogLevel.WARNING, - log_directory=None, - messenger=None - ): + def __init__(self, path, *, casd, remote_cache=False): self.casdir = os.path.join(path, "cas") self.tmpdir = os.path.join(path, "tmp") os.makedirs(self.tmpdir, exist_ok=True) @@ -78,16 +63,10 @@ class CASCache: self._cache_usage_monitor = None self._cache_usage_monitor_forbidden = False - self._remote_cache = bool(remote_cache_spec) + self._remote_cache = remote_cache - self._casd = None + self._casd = casd if casd: - assert log_directory is not None, "log_directory is required when casd is True" - log_dir = os.path.join(log_directory, "_casd") - self._casd = CASDProcessManager( - path, log_dir, log_level, cache_quota, remote_cache_spec, protect_session_blobs, messenger - ) - self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd) self._cache_usage_monitor.start() else: @@ -124,15 +103,11 @@ class CASCache: # # Release resources used by CASCache. # - def release_resources(self, messenger=None): + def release_resources(self): if self._cache_usage_monitor: self._cache_usage_monitor.stop() self._cache_usage_monitor.join() - if self._casd: - self._casd.release_resources(messenger) - self._casd = None - def get_default_remote(self): return self._default_remote diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py index b0f75a4a4..7093880c2 100644 --- a/src/buildstream/_cas/casdprocessmanager.py +++ b/src/buildstream/_cas/casdprocessmanager.py @@ -63,6 +63,8 @@ _REQUIRED_CASD_MICRO = 58 # class CASDProcessManager: def __init__(self, path, log_dir, log_level, cache_quota, remote_cache_spec, protect_session_blobs, messenger): + os.makedirs(path, exist_ok=True) + self._log_dir = log_dir self._socket_path = self._make_socket_path(path) diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index 545e5c37e..31b2df85f 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -30,7 +30,7 @@ from ._artifactcache import ArtifactCache from ._elementsourcescache import ElementSourcesCache from ._remotespec import RemoteSpec, RemoteExecutionSpec from ._sourcecache import SourceCache -from ._cas import CASCache, CASLogLevel +from ._cas import CASCache, CASDProcessManager, CASLogLevel from .types import _CacheBuildTrees, _PipelineSelection, _SchedulerErrorAction, _SourceUriPolicy from ._workspaces import Workspaces, WorkspaceProjectCache from .node import Node, MappingNode @@ -223,6 +223,7 @@ class Context: self._project_overrides: MappingNode = Node.from_dict({}) self._workspaces: Optional[Workspaces] = None self._workspace_project_cache: WorkspaceProjectCache = WorkspaceProjectCache() + self._casd: Optional[CASDProcessManager] = None self._cascache: Optional[CASCache] = None # __enter__() @@ -247,7 +248,11 @@ class Context: self._sourcecache.release_resources() if self._cascache: - self._cascache.release_resources(self.messenger) + self._cascache.release_resources() + + if self._casd: + self._casd.release_resources(self.messenger) + self._casd = None # load() # @@ -677,8 +682,8 @@ class Context: # value which we cache here too. return self._strict_build_plan - def get_cascache(self) -> CASCache: - if self._cascache is None: + def get_casd(self) -> CASDProcessManager: + if self._casd is None: if self.log_debug: log_level = CASLogLevel.TRACE elif self.log_verbose: @@ -686,15 +691,22 @@ class Context: else: log_level = CASLogLevel.WARNING - self._cascache = CASCache( + assert self.logdir is not None, "log_directory is required for casd" + log_dir = os.path.join(self.logdir, "_casd") + self._casd = CASDProcessManager( self.cachedir, - casd=self.use_casd, - cache_quota=self.config_cache_quota, - remote_cache_spec=self.remote_cache_spec, - log_level=log_level, - log_directory=self.logdir, + log_dir, + log_level, + self.config_cache_quota, + self.remote_cache_spec, + protect_session_blobs=True, messenger=self.messenger, ) + return self._casd + + def get_cascache(self) -> CASCache: + if self._cascache is None: + self._cascache = CASCache(self.cachedir, casd=self.get_casd(), remote_cache=bool(self.remote_cache_spec)) return self._cascache ###################################################### diff --git a/src/buildstream/_testing/runcli.py b/src/buildstream/_testing/runcli.py index 8daaf08b6..87682f009 100644 --- a/src/buildstream/_testing/runcli.py +++ b/src/buildstream/_testing/runcli.py @@ -725,7 +725,7 @@ class TestArtifact: def _extract_subdirectory(self, tmpdir, digest): with tempfile.TemporaryDirectory() as extractdir: try: - cas = CASCache(str(tmpdir), casd=False) + cas = CASCache(str(tmpdir), casd=None) cas.checkout(extractdir, digest) yield extractdir except FileNotFoundError: diff --git a/tests/artifactcache/expiry.py b/tests/artifactcache/expiry.py index 3fd312174..629e34601 100644 --- a/tests/artifactcache/expiry.py +++ b/tests/artifactcache/expiry.py @@ -22,20 +22,18 @@ import time import pytest -from buildstream._cas import CASCache from buildstream.exceptions import ErrorDomain, LoadErrorReason from buildstream._testing import cli # pylint: disable=unused-import from buildstream._testing._utils.site import have_subsecond_mtime -from tests.testutils import create_element_size, wait_for_cache_granularity +from tests.testutils import casd_cache, create_element_size, wait_for_cache_granularity DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "expiry") def get_cache_usage(directory): - cas_cache = CASCache(directory, log_directory=os.path.dirname(directory)) - try: + with casd_cache(directory) as cas_cache: wait = 0.1 for _ in range(0, int(5 / wait)): used_size = cas_cache.get_cache_usage().used_size @@ -45,8 +43,6 @@ def get_cache_usage(directory): assert False, "Unable to retrieve cache usage" return None - finally: - cas_cache.release_resources() # Ensure that the cache successfully removes an old artifact if we do diff --git a/tests/internals/cascache.py b/tests/internals/cascache.py index 23f7575c4..6c51f9045 100644 --- a/tests/internals/cascache.py +++ b/tests/internals/cascache.py @@ -15,9 +15,9 @@ import os import time from unittest.mock import MagicMock -from buildstream._cas.cascache import CASCache from buildstream._cas import casdprocessmanager from buildstream._messenger import Messenger +from tests.testutils import casd_cache # @@ -42,9 +42,8 @@ def test_report_when_cascache_dies_before_asked_to(tmp_path, monkeypatch): monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep) messenger = MagicMock(spec_set=Messenger) - cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs"))) - time.sleep(1) - cache.release_resources(messenger) + with casd_cache(tmp_path.joinpath("casd"), messenger) as cache: + time.sleep(1) assert messenger.bug.call_count == 1 @@ -64,9 +63,8 @@ def test_report_when_cascache_exits_not_cleanly(tmp_path, monkeypatch): monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1) messenger = MagicMock(spec_set=Messenger) - cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs"))) - time.sleep(1) - cache.release_resources(messenger) + with casd_cache(tmp_path.joinpath("casd"), messenger) as cache: + time.sleep(1) assert messenger.bug.call_count == 1 @@ -86,9 +84,8 @@ def test_report_when_cascache_is_forcefully_killed(tmp_path, monkeypatch): monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1) messenger = MagicMock(spec_set=Messenger) - cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs"))) - time.sleep(1) - cache.release_resources(messenger) + with casd_cache(tmp_path.joinpath("casd"), messenger) as cache: + time.sleep(1) assert messenger.warn.call_count == 1 @@ -114,9 +111,8 @@ def test_casd_redirects_stderr_to_file_and_rotate(tmp_path, monkeypatch): # Let's create the first `n_max_log_files` log files for i in range(1, n_max_log_files + 1): - cache = CASCache(str(casd_files_path), casd=True, log_directory=str(casd_parent_logs_path)) - time.sleep(0.5) - cache.release_resources() + with casd_cache(casd_files_path) as cache: + time.sleep(0.5) existing_log_files = sorted(casd_logs_path.iterdir()) assert len(existing_log_files) == i @@ -126,9 +122,8 @@ def test_casd_redirects_stderr_to_file_and_rotate(tmp_path, monkeypatch): for _ in range(3): evicted_file = existing_log_files.pop(0) - cache = CASCache(str(casd_files_path), casd=True, log_directory=str(casd_parent_logs_path)) - time.sleep(0.5) - cache.release_resources() + with casd_cache(casd_files_path) as cache: + time.sleep(0.5) existing_log_files = sorted(casd_logs_path.iterdir()) assert len(existing_log_files) == n_max_log_files diff --git a/tests/internals/storage.py b/tests/internals/storage.py index 49bb7043c..f3b4031cb 100644 --- a/tests/internals/storage.py +++ b/tests/internals/storage.py @@ -24,10 +24,11 @@ from typing import List, Optional import pytest from buildstream import DirectoryError, FileType -from buildstream._cas import CASCache from buildstream.storage._casbaseddirectory import CasBasedDirectory from buildstream.storage._filebaseddirectory import FileBasedDirectory +from tests.testutils import casd_cache + DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "storage") @@ -38,11 +39,8 @@ def setup_backend(backend_class, tmpdir): os.mkdir(path) yield backend_class(path) else: - cas_cache = CASCache(os.path.join(tmpdir, "cas"), log_directory=os.path.join(tmpdir, "logs")) - try: + with casd_cache(os.path.join(tmpdir, "cas")) as cas_cache: yield backend_class(cas_cache) - finally: - cas_cache.release_resources() @pytest.mark.parametrize("backend", [FileBasedDirectory, CasBasedDirectory]) diff --git a/tests/internals/storage_vdir_import.py b/tests/internals/storage_vdir_import.py index e0ca134b6..637283e69 100644 --- a/tests/internals/storage_vdir_import.py +++ b/tests/internals/storage_vdir_import.py @@ -20,10 +20,11 @@ import pytest from buildstream import DirectoryError from buildstream.storage._casbaseddirectory import CasBasedDirectory from buildstream.storage._filebaseddirectory import FileBasedDirectory -from buildstream._cas import CASCache from buildstream.utils import _set_file_mtime from buildstream._testing._utils.site import have_subsecond_mtime +from tests.testutils import casd_cache + # These are comparitive tests that check that FileBasedDirectory and # CasBasedDirectory act identically. @@ -189,8 +190,7 @@ def _import_test(tmpdir, original, overlay, generator_function, verify_contents= if not have_subsecond_mtime(str(tmpdir)): pytest.skip("Filesystem does not support subsecond mtime precision: {}".format(str(tmpdir))) - cas_cache = CASCache(tmpdir, log_directory=os.path.join(tmpdir, "logs")) - try: + with casd_cache(os.path.join(tmpdir, "casd")) as cas_cache: # Create some fake content generator_function(original, tmpdir) if original != overlay: @@ -245,8 +245,6 @@ def _import_test(tmpdir, original, overlay, generator_function, verify_contents= duplicate_cas._import_files_internal(roundtrip_dir, properties=["mtime"]) assert duplicate_cas._get_digest().hash == d._get_digest().hash - finally: - cas_cache.release_resources() @pytest.mark.parametrize("original", range(1, len(root_filesets) + 1)) @@ -262,8 +260,7 @@ def test_random_cas_import(tmpdir, original, overlay): def _listing_test(tmpdir, root, generator_function): - cas_cache = CASCache(tmpdir, log_directory=os.path.join(tmpdir, "logs")) - try: + with casd_cache(os.path.join(tmpdir, "casd")) as cas_cache: # Create some fake content generator_function(root, tmpdir) @@ -274,8 +271,6 @@ def _listing_test(tmpdir, root, generator_function): filelist2 = list(d2.list_relative_paths()) assert filelist == filelist2 - finally: - cas_cache.release_resources() @pytest.mark.parametrize("root", range(1, NUM_RANDOM_TESTS + 1)) @@ -291,8 +286,7 @@ def test_fixed_directory_listing(tmpdir, root): # Check that the vdir is decending and readable def test_open_directory(tmpdir): cas_dir = os.path.join(str(tmpdir), "cas") - cas_cache = CASCache(cas_dir, log_directory=os.path.join(str(tmpdir), "logs")) - try: + with casd_cache(cas_dir) as cas_cache: d = CasBasedDirectory(cas_cache) Content_to_check = "You got me" @@ -306,8 +300,6 @@ def test_open_directory(tmpdir): with open(cas_cache.objpath(digest), encoding="utf-8") as fp: content = fp.read() assert Content_to_check == content - finally: - cas_cache.release_resources() # Check symlink logic for edgecases @@ -315,8 +307,7 @@ def test_open_directory(tmpdir): # to decend in to files or links to files def test_bad_symlinks(tmpdir): cas_dir = os.path.join(str(tmpdir), "cas") - cas_cache = CASCache(cas_dir, log_directory=os.path.join(str(tmpdir), "logs")) - try: + with casd_cache(cas_dir) as cas_cache: d = CasBasedDirectory(cas_cache) test_dir = os.path.join(str(tmpdir), "importfrom") @@ -336,16 +327,13 @@ def test_bad_symlinks(tmpdir): with pytest.raises(DirectoryError) as error: d.open_directory("a/f") assert error.reason == exp_reason - finally: - cas_cache.release_resources() # Check symlink logic for edgecases # Check decend accross relitive link def test_relative_symlink(tmpdir): cas_dir = os.path.join(str(tmpdir), "cas") - cas_cache = CASCache(cas_dir, log_directory=os.path.join(str(tmpdir), "logs")) - try: + with casd_cache(cas_dir) as cas_cache: d = CasBasedDirectory(cas_cache) Content_to_check = "You got me" @@ -363,16 +351,13 @@ def test_relative_symlink(tmpdir): with open(cas_cache.objpath(digest), encoding="utf-8") as fp: content = fp.read() assert Content_to_check == content - finally: - cas_cache.release_resources() # Check symlink logic for edgecases # Check deccend accross abs link def test_abs_symlink(tmpdir): cas_dir = os.path.join(str(tmpdir), "cas") - cas_cache = CASCache(cas_dir, log_directory=os.path.join(str(tmpdir), "logs")) - try: + with casd_cache(cas_dir) as cas_cache: d = CasBasedDirectory(cas_cache) Content_to_check = "two step file" @@ -391,16 +376,13 @@ def test_abs_symlink(tmpdir): with open(cas_cache.objpath(digest), encoding="utf-8") as fp: content = fp.read() assert Content_to_check == content - finally: - cas_cache.release_resources() # Check symlink logic for edgecases # Check symlink can not escape root def test_bad_sym_escape(tmpdir): cas_dir = os.path.join(str(tmpdir), "cas") - cas_cache = CASCache(cas_dir, log_directory=os.path.join(str(tmpdir), "logs")) - try: + with casd_cache(cas_dir) as cas_cache: d = CasBasedDirectory(cas_cache) test_dir = os.path.join(str(tmpdir), "importfrom") @@ -417,5 +399,3 @@ def test_bad_sym_escape(tmpdir): with pytest.raises(DirectoryError) as error: d.open_directory("a/l", follow_symlinks=True) assert error.reason == "directory-not-found" - finally: - cas_cache.release_resources() diff --git a/tests/testutils/__init__.py b/tests/testutils/__init__.py index 028131e4b..03eb422ea 100644 --- a/tests/testutils/__init__.py +++ b/tests/testutils/__init__.py @@ -20,6 +20,7 @@ # from .artifactshare import create_artifact_share, create_split_share, assert_shared, assert_not_shared, ArtifactShare +from .casd import casd_cache from .context import dummy_context from .element_generators import create_element_size from .junction import generate_junction diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index 515164aac..fcdbcb597 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -142,7 +142,7 @@ class ArtifactShare(BaseArtifactShare): super().__init__() # Set after subprocess creation as it's not picklable - self.cas = CASCache(self.repodir, casd=False) + self.cas = CASCache(self.repodir, casd=None) def _create_server(self): return create_server( diff --git a/src/buildstream/_cas/__init__.py b/tests/testutils/casd.py similarity index 50% copy from src/buildstream/_cas/__init__.py copy to tests/testutils/casd.py index 5f9c4abb1..9a3eb8e71 100644 --- a/src/buildstream/_cas/__init__.py +++ b/tests/testutils/casd.py @@ -10,9 +10,23 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# -# Authors: -# Tristan Van Berkom <[email protected]> -from .cascache import CASCache, CASLogLevel -from .casremote import CASRemote +import os +from contextlib import contextmanager + +from buildstream._cas import CASCache, CASDProcessManager, CASLogLevel + + +@contextmanager +def casd_cache(path, messenger=None): + casd = CASDProcessManager( + str(path), os.path.join(str(path), "..", "logs", "_casd"), CASLogLevel.WARNING, None, None, True, None + ) + try: + cascache = CASCache(str(path), casd=casd) + try: + yield cascache + finally: + cascache.release_resources() + finally: + casd.release_resources(messenger)
