This is an automated email from the ASF dual-hosted git repository.

juergbi pushed a commit to branch juerg/buildbox-asset-remote
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit e1fbeaebff873c9fc7f29585723ce041fe2dd901
Author: Jürg Billeter <[email protected]>
AuthorDate: Fri May 31 12:52:16 2024 +0200

    Move `CASDProcessManager` instance from `CASCache` to `Context`
---
 src/buildstream/_cas/__init__.py                   |  1 +
 src/buildstream/_cas/cascache.py                   | 37 ++++-----------------
 src/buildstream/_cas/casdprocessmanager.py         |  2 ++
 src/buildstream/_context.py                        | 32 ++++++++++++------
 src/buildstream/_testing/runcli.py                 |  2 +-
 tests/artifactcache/expiry.py                      |  8 ++---
 tests/internals/cascache.py                        | 27 +++++++--------
 tests/internals/storage.py                         |  8 ++---
 tests/internals/storage_vdir_import.py             | 38 +++++-----------------
 tests/testutils/__init__.py                        |  1 +
 tests/testutils/artifactshare.py                   |  2 +-
 .../_cas/__init__.py => tests/testutils/casd.py    | 24 +++++++++++---
 12 files changed, 78 insertions(+), 104 deletions(-)

diff --git a/src/buildstream/_cas/__init__.py b/src/buildstream/_cas/__init__.py
index 5f9c4abb1..52f425d56 100644
--- a/src/buildstream/_cas/__init__.py
+++ b/src/buildstream/_cas/__init__.py
@@ -15,4 +15,5 @@
 #        Tristan Van Berkom <[email protected]>
 
 from .cascache import CASCache, CASLogLevel
+from .casdprocessmanager import CASDProcessManager
 from .casremote import CASRemote
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 8110337bc..627778c16 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -32,7 +32,6 @@ from .. import utils
 from ..types import FastEnum, SourceRef
 from .._exceptions import CASCacheError
 
-from .casdprocessmanager import CASDProcessManager
 from .casremote import CASRemote, _CASBatchRead, _CASBatchUpdate, BlobNotFound
 
 _BUFFER_SIZE = 65536
@@ -52,25 +51,11 @@ class CASLogLevel(FastEnum):
 #
 # Args:
 #     path (str): The root directory for the CAS repository
-#     casd (bool): True to spawn buildbox-casd (default) or False (testing 
only)
-#     cache_quota (int): User configured cache quota
-#     protect_session_blobs (bool): Disable expiry for blobs used in the 
current session
-#     log_level (LogLevel): Log level to give to buildbox-casd for logging
-#     log_directory (str): the root of the directory in which to store logs
+#     casd (CASDProcessManager): The buildbox-casd manager
+#     remote_cache (bool): True if a CAS server is configured as a remote 
cache (storage-service)
 #
 class CASCache:
-    def __init__(
-        self,
-        path,
-        *,
-        casd=True,
-        cache_quota=None,
-        remote_cache_spec=None,
-        protect_session_blobs=True,
-        log_level=CASLogLevel.WARNING,
-        log_directory=None,
-        messenger=None
-    ):
+    def __init__(self, path, *, casd, remote_cache=False):
         self.casdir = os.path.join(path, "cas")
         self.tmpdir = os.path.join(path, "tmp")
         os.makedirs(self.tmpdir, exist_ok=True)
@@ -78,16 +63,10 @@ class CASCache:
         self._cache_usage_monitor = None
         self._cache_usage_monitor_forbidden = False
 
-        self._remote_cache = bool(remote_cache_spec)
+        self._remote_cache = remote_cache
 
-        self._casd = None
+        self._casd = casd
         if casd:
-            assert log_directory is not None, "log_directory is required when 
casd is True"
-            log_dir = os.path.join(log_directory, "_casd")
-            self._casd = CASDProcessManager(
-                path, log_dir, log_level, cache_quota, remote_cache_spec, 
protect_session_blobs, messenger
-            )
-
             self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd)
             self._cache_usage_monitor.start()
         else:
@@ -124,15 +103,11 @@ class CASCache:
     #
     # Release resources used by CASCache.
     #
-    def release_resources(self, messenger=None):
+    def release_resources(self):
         if self._cache_usage_monitor:
             self._cache_usage_monitor.stop()
             self._cache_usage_monitor.join()
 
-        if self._casd:
-            self._casd.release_resources(messenger)
-            self._casd = None
-
     def get_default_remote(self):
         return self._default_remote
 
diff --git a/src/buildstream/_cas/casdprocessmanager.py 
b/src/buildstream/_cas/casdprocessmanager.py
index b0f75a4a4..7093880c2 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -63,6 +63,8 @@ _REQUIRED_CASD_MICRO = 58
 #
 class CASDProcessManager:
     def __init__(self, path, log_dir, log_level, cache_quota, 
remote_cache_spec, protect_session_blobs, messenger):
+        os.makedirs(path, exist_ok=True)
+
         self._log_dir = log_dir
 
         self._socket_path = self._make_socket_path(path)
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index 545e5c37e..31b2df85f 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -30,7 +30,7 @@ from ._artifactcache import ArtifactCache
 from ._elementsourcescache import ElementSourcesCache
 from ._remotespec import RemoteSpec, RemoteExecutionSpec
 from ._sourcecache import SourceCache
-from ._cas import CASCache, CASLogLevel
+from ._cas import CASCache, CASDProcessManager, CASLogLevel
 from .types import _CacheBuildTrees, _PipelineSelection, 
_SchedulerErrorAction, _SourceUriPolicy
 from ._workspaces import Workspaces, WorkspaceProjectCache
 from .node import Node, MappingNode
@@ -223,6 +223,7 @@ class Context:
         self._project_overrides: MappingNode = Node.from_dict({})
         self._workspaces: Optional[Workspaces] = None
         self._workspace_project_cache: WorkspaceProjectCache = 
WorkspaceProjectCache()
+        self._casd: Optional[CASDProcessManager] = None
         self._cascache: Optional[CASCache] = None
 
     # __enter__()
@@ -247,7 +248,11 @@ class Context:
             self._sourcecache.release_resources()
 
         if self._cascache:
-            self._cascache.release_resources(self.messenger)
+            self._cascache.release_resources()
+
+        if self._casd:
+            self._casd.release_resources(self.messenger)
+            self._casd = None
 
     # load()
     #
@@ -677,8 +682,8 @@ class Context:
         # value which we cache here too.
         return self._strict_build_plan
 
-    def get_cascache(self) -> CASCache:
-        if self._cascache is None:
+    def get_casd(self) -> CASDProcessManager:
+        if self._casd is None:
             if self.log_debug:
                 log_level = CASLogLevel.TRACE
             elif self.log_verbose:
@@ -686,15 +691,22 @@ class Context:
             else:
                 log_level = CASLogLevel.WARNING
 
-            self._cascache = CASCache(
+            assert self.logdir is not None, "log_directory is required for 
casd"
+            log_dir = os.path.join(self.logdir, "_casd")
+            self._casd = CASDProcessManager(
                 self.cachedir,
-                casd=self.use_casd,
-                cache_quota=self.config_cache_quota,
-                remote_cache_spec=self.remote_cache_spec,
-                log_level=log_level,
-                log_directory=self.logdir,
+                log_dir,
+                log_level,
+                self.config_cache_quota,
+                self.remote_cache_spec,
+                protect_session_blobs=True,
                 messenger=self.messenger,
             )
+        return self._casd
+
+    def get_cascache(self) -> CASCache:
+        if self._cascache is None:
+            self._cascache = CASCache(self.cachedir, casd=self.get_casd(), 
remote_cache=bool(self.remote_cache_spec))
         return self._cascache
 
     ######################################################
diff --git a/src/buildstream/_testing/runcli.py 
b/src/buildstream/_testing/runcli.py
index 8daaf08b6..87682f009 100644
--- a/src/buildstream/_testing/runcli.py
+++ b/src/buildstream/_testing/runcli.py
@@ -725,7 +725,7 @@ class TestArtifact:
     def _extract_subdirectory(self, tmpdir, digest):
         with tempfile.TemporaryDirectory() as extractdir:
             try:
-                cas = CASCache(str(tmpdir), casd=False)
+                cas = CASCache(str(tmpdir), casd=None)
                 cas.checkout(extractdir, digest)
                 yield extractdir
             except FileNotFoundError:
diff --git a/tests/artifactcache/expiry.py b/tests/artifactcache/expiry.py
index 3fd312174..629e34601 100644
--- a/tests/artifactcache/expiry.py
+++ b/tests/artifactcache/expiry.py
@@ -22,20 +22,18 @@ import time
 
 import pytest
 
-from buildstream._cas import CASCache
 from buildstream.exceptions import ErrorDomain, LoadErrorReason
 from buildstream._testing import cli  # pylint: disable=unused-import
 from buildstream._testing._utils.site import have_subsecond_mtime
 
-from tests.testutils import create_element_size, wait_for_cache_granularity
+from tests.testutils import casd_cache, create_element_size, 
wait_for_cache_granularity
 
 
 DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "expiry")
 
 
 def get_cache_usage(directory):
-    cas_cache = CASCache(directory, log_directory=os.path.dirname(directory))
-    try:
+    with casd_cache(directory) as cas_cache:
         wait = 0.1
         for _ in range(0, int(5 / wait)):
             used_size = cas_cache.get_cache_usage().used_size
@@ -45,8 +43,6 @@ def get_cache_usage(directory):
 
         assert False, "Unable to retrieve cache usage"
         return None
-    finally:
-        cas_cache.release_resources()
 
 
 # Ensure that the cache successfully removes an old artifact if we do
diff --git a/tests/internals/cascache.py b/tests/internals/cascache.py
index 23f7575c4..6c51f9045 100644
--- a/tests/internals/cascache.py
+++ b/tests/internals/cascache.py
@@ -15,9 +15,9 @@ import os
 import time
 from unittest.mock import MagicMock
 
-from buildstream._cas.cascache import CASCache
 from buildstream._cas import casdprocessmanager
 from buildstream._messenger import Messenger
+from tests.testutils import casd_cache
 
 
 #
@@ -42,9 +42,8 @@ def test_report_when_cascache_dies_before_asked_to(tmp_path, 
monkeypatch):
     monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep)
 
     messenger = MagicMock(spec_set=Messenger)
-    cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, 
log_directory=str(tmp_path.joinpath("logs")))
-    time.sleep(1)
-    cache.release_resources(messenger)
+    with casd_cache(tmp_path.joinpath("casd"), messenger) as cache:
+        time.sleep(1)
 
     assert messenger.bug.call_count == 1
 
@@ -64,9 +63,8 @@ def test_report_when_cascache_exits_not_cleanly(tmp_path, 
monkeypatch):
     monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1)
 
     messenger = MagicMock(spec_set=Messenger)
-    cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, 
log_directory=str(tmp_path.joinpath("logs")))
-    time.sleep(1)
-    cache.release_resources(messenger)
+    with casd_cache(tmp_path.joinpath("casd"), messenger) as cache:
+        time.sleep(1)
 
     assert messenger.bug.call_count == 1
 
@@ -86,9 +84,8 @@ def test_report_when_cascache_is_forcefully_killed(tmp_path, 
monkeypatch):
     monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1)
 
     messenger = MagicMock(spec_set=Messenger)
-    cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, 
log_directory=str(tmp_path.joinpath("logs")))
-    time.sleep(1)
-    cache.release_resources(messenger)
+    with casd_cache(tmp_path.joinpath("casd"), messenger) as cache:
+        time.sleep(1)
 
     assert messenger.warn.call_count == 1
 
@@ -114,9 +111,8 @@ def test_casd_redirects_stderr_to_file_and_rotate(tmp_path, 
monkeypatch):
 
     # Let's create the first `n_max_log_files` log files
     for i in range(1, n_max_log_files + 1):
-        cache = CASCache(str(casd_files_path), casd=True, 
log_directory=str(casd_parent_logs_path))
-        time.sleep(0.5)
-        cache.release_resources()
+        with casd_cache(casd_files_path) as cache:
+            time.sleep(0.5)
 
         existing_log_files = sorted(casd_logs_path.iterdir())
         assert len(existing_log_files) == i
@@ -126,9 +122,8 @@ def test_casd_redirects_stderr_to_file_and_rotate(tmp_path, 
monkeypatch):
     for _ in range(3):
         evicted_file = existing_log_files.pop(0)
 
-        cache = CASCache(str(casd_files_path), casd=True, 
log_directory=str(casd_parent_logs_path))
-        time.sleep(0.5)
-        cache.release_resources()
+        with casd_cache(casd_files_path) as cache:
+            time.sleep(0.5)
 
         existing_log_files = sorted(casd_logs_path.iterdir())
         assert len(existing_log_files) == n_max_log_files
diff --git a/tests/internals/storage.py b/tests/internals/storage.py
index 49bb7043c..f3b4031cb 100644
--- a/tests/internals/storage.py
+++ b/tests/internals/storage.py
@@ -24,10 +24,11 @@ from typing import List, Optional
 import pytest
 
 from buildstream import DirectoryError, FileType
-from buildstream._cas import CASCache
 from buildstream.storage._casbaseddirectory import CasBasedDirectory
 from buildstream.storage._filebaseddirectory import FileBasedDirectory
 
+from tests.testutils import casd_cache
+
 DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "storage")
 
 
@@ -38,11 +39,8 @@ def setup_backend(backend_class, tmpdir):
         os.mkdir(path)
         yield backend_class(path)
     else:
-        cas_cache = CASCache(os.path.join(tmpdir, "cas"), 
log_directory=os.path.join(tmpdir, "logs"))
-        try:
+        with casd_cache(os.path.join(tmpdir, "cas")) as cas_cache:
             yield backend_class(cas_cache)
-        finally:
-            cas_cache.release_resources()
 
 
 @pytest.mark.parametrize("backend", [FileBasedDirectory, CasBasedDirectory])
diff --git a/tests/internals/storage_vdir_import.py 
b/tests/internals/storage_vdir_import.py
index e0ca134b6..637283e69 100644
--- a/tests/internals/storage_vdir_import.py
+++ b/tests/internals/storage_vdir_import.py
@@ -20,10 +20,11 @@ import pytest
 from buildstream import DirectoryError
 from buildstream.storage._casbaseddirectory import CasBasedDirectory
 from buildstream.storage._filebaseddirectory import FileBasedDirectory
-from buildstream._cas import CASCache
 from buildstream.utils import _set_file_mtime
 from buildstream._testing._utils.site import have_subsecond_mtime
 
+from tests.testutils import casd_cache
+
 
 # These are comparitive tests that check that FileBasedDirectory and
 # CasBasedDirectory act identically.
@@ -189,8 +190,7 @@ def _import_test(tmpdir, original, overlay, 
generator_function, verify_contents=
     if not have_subsecond_mtime(str(tmpdir)):
         pytest.skip("Filesystem does not support subsecond mtime precision: 
{}".format(str(tmpdir)))
 
-    cas_cache = CASCache(tmpdir, log_directory=os.path.join(tmpdir, "logs"))
-    try:
+    with casd_cache(os.path.join(tmpdir, "casd")) as cas_cache:
         # Create some fake content
         generator_function(original, tmpdir)
         if original != overlay:
@@ -245,8 +245,6 @@ def _import_test(tmpdir, original, overlay, 
generator_function, verify_contents=
         duplicate_cas._import_files_internal(roundtrip_dir, 
properties=["mtime"])
 
         assert duplicate_cas._get_digest().hash == d._get_digest().hash
-    finally:
-        cas_cache.release_resources()
 
 
 @pytest.mark.parametrize("original", range(1, len(root_filesets) + 1))
@@ -262,8 +260,7 @@ def test_random_cas_import(tmpdir, original, overlay):
 
 
 def _listing_test(tmpdir, root, generator_function):
-    cas_cache = CASCache(tmpdir, log_directory=os.path.join(tmpdir, "logs"))
-    try:
+    with casd_cache(os.path.join(tmpdir, "casd")) as cas_cache:
         # Create some fake content
         generator_function(root, tmpdir)
 
@@ -274,8 +271,6 @@ def _listing_test(tmpdir, root, generator_function):
         filelist2 = list(d2.list_relative_paths())
 
         assert filelist == filelist2
-    finally:
-        cas_cache.release_resources()
 
 
 @pytest.mark.parametrize("root", range(1, NUM_RANDOM_TESTS + 1))
@@ -291,8 +286,7 @@ def test_fixed_directory_listing(tmpdir, root):
 # Check that the vdir is decending and readable
 def test_open_directory(tmpdir):
     cas_dir = os.path.join(str(tmpdir), "cas")
-    cas_cache = CASCache(cas_dir, log_directory=os.path.join(str(tmpdir), 
"logs"))
-    try:
+    with casd_cache(cas_dir) as cas_cache:
         d = CasBasedDirectory(cas_cache)
 
         Content_to_check = "You got me"
@@ -306,8 +300,6 @@ def test_open_directory(tmpdir):
         with open(cas_cache.objpath(digest), encoding="utf-8") as fp:
             content = fp.read()
         assert Content_to_check == content
-    finally:
-        cas_cache.release_resources()
 
 
 # Check symlink logic for edgecases
@@ -315,8 +307,7 @@ def test_open_directory(tmpdir):
 # to decend in to files or links to files
 def test_bad_symlinks(tmpdir):
     cas_dir = os.path.join(str(tmpdir), "cas")
-    cas_cache = CASCache(cas_dir, log_directory=os.path.join(str(tmpdir), 
"logs"))
-    try:
+    with casd_cache(cas_dir) as cas_cache:
         d = CasBasedDirectory(cas_cache)
 
         test_dir = os.path.join(str(tmpdir), "importfrom")
@@ -336,16 +327,13 @@ def test_bad_symlinks(tmpdir):
         with pytest.raises(DirectoryError) as error:
             d.open_directory("a/f")
             assert error.reason == exp_reason
-    finally:
-        cas_cache.release_resources()
 
 
 # Check symlink logic for edgecases
 # Check decend accross relitive link
 def test_relative_symlink(tmpdir):
     cas_dir = os.path.join(str(tmpdir), "cas")
-    cas_cache = CASCache(cas_dir, log_directory=os.path.join(str(tmpdir), 
"logs"))
-    try:
+    with casd_cache(cas_dir) as cas_cache:
         d = CasBasedDirectory(cas_cache)
 
         Content_to_check = "You got me"
@@ -363,16 +351,13 @@ def test_relative_symlink(tmpdir):
         with open(cas_cache.objpath(digest), encoding="utf-8") as fp:
             content = fp.read()
         assert Content_to_check == content
-    finally:
-        cas_cache.release_resources()
 
 
 # Check symlink logic for edgecases
 # Check deccend accross abs link
 def test_abs_symlink(tmpdir):
     cas_dir = os.path.join(str(tmpdir), "cas")
-    cas_cache = CASCache(cas_dir, log_directory=os.path.join(str(tmpdir), 
"logs"))
-    try:
+    with casd_cache(cas_dir) as cas_cache:
         d = CasBasedDirectory(cas_cache)
 
         Content_to_check = "two step file"
@@ -391,16 +376,13 @@ def test_abs_symlink(tmpdir):
         with open(cas_cache.objpath(digest), encoding="utf-8") as fp:
             content = fp.read()
         assert Content_to_check == content
-    finally:
-        cas_cache.release_resources()
 
 
 # Check symlink logic for edgecases
 # Check symlink can not escape root
 def test_bad_sym_escape(tmpdir):
     cas_dir = os.path.join(str(tmpdir), "cas")
-    cas_cache = CASCache(cas_dir, log_directory=os.path.join(str(tmpdir), 
"logs"))
-    try:
+    with casd_cache(cas_dir) as cas_cache:
         d = CasBasedDirectory(cas_cache)
 
         test_dir = os.path.join(str(tmpdir), "importfrom")
@@ -417,5 +399,3 @@ def test_bad_sym_escape(tmpdir):
         with pytest.raises(DirectoryError) as error:
             d.open_directory("a/l", follow_symlinks=True)
             assert error.reason == "directory-not-found"
-    finally:
-        cas_cache.release_resources()
diff --git a/tests/testutils/__init__.py b/tests/testutils/__init__.py
index 028131e4b..03eb422ea 100644
--- a/tests/testutils/__init__.py
+++ b/tests/testutils/__init__.py
@@ -20,6 +20,7 @@
 #
 
 from .artifactshare import create_artifact_share, create_split_share, 
assert_shared, assert_not_shared, ArtifactShare
+from .casd import casd_cache
 from .context import dummy_context
 from .element_generators import create_element_size
 from .junction import generate_junction
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index 515164aac..fcdbcb597 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -142,7 +142,7 @@ class ArtifactShare(BaseArtifactShare):
         super().__init__()
 
         # Set after subprocess creation as it's not picklable
-        self.cas = CASCache(self.repodir, casd=False)
+        self.cas = CASCache(self.repodir, casd=None)
 
     def _create_server(self):
         return create_server(
diff --git a/src/buildstream/_cas/__init__.py b/tests/testutils/casd.py
similarity index 50%
copy from src/buildstream/_cas/__init__.py
copy to tests/testutils/casd.py
index 5f9c4abb1..9a3eb8e71 100644
--- a/src/buildstream/_cas/__init__.py
+++ b/tests/testutils/casd.py
@@ -10,9 +10,23 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
-#
-#  Authors:
-#        Tristan Van Berkom <[email protected]>
 
-from .cascache import CASCache, CASLogLevel
-from .casremote import CASRemote
+import os
+from contextlib import contextmanager
+
+from buildstream._cas import CASCache, CASDProcessManager, CASLogLevel
+
+
+@contextmanager
+def casd_cache(path, messenger=None):
+    casd = CASDProcessManager(
+        str(path), os.path.join(str(path), "..", "logs", "_casd"), 
CASLogLevel.WARNING, None, None, True, None
+    )
+    try:
+        cascache = CASCache(str(path), casd=casd)
+        try:
+            yield cascache
+        finally:
+            cascache.release_resources()
+    finally:
+        casd.release_resources(messenger)

Reply via email to