This is an automated email from the ASF dual-hosted git repository.

juergbi pushed a commit to branch juerg/buildbox-asset-remote
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 6e947605d6af1e55f3c4d77db5553713755c67e9
Author: Jürg Billeter <[email protected]>
AuthorDate: Fri May 31 12:32:18 2024 +0200

    _cas: Merge `CASDChannel` into `CASDProcessManager`
    
    There is only ever a single instance of `CASDChannel` and it's always
    created right after creating `CASDProcessManager`. This is in
    preparation of using buildbox-casd outside of `CASCache`.
---
 src/buildstream/_cas/cascache.py               | 48 ++++++---------
 src/buildstream/_cas/casdprocessmanager.py     | 83 +++++++-------------------
 src/buildstream/_cas/casserver.py              | 15 ++---
 src/buildstream/_stream.py                     |  2 +-
 src/buildstream/sandbox/_sandboxbuildboxrun.py |  4 +-
 tests/artifactcache/pull.py                    |  3 -
 6 files changed, 48 insertions(+), 107 deletions(-)

diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 42937c1d6..8110337bc 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -80,17 +80,15 @@ class CASCache:
 
         self._remote_cache = bool(remote_cache_spec)
 
-        self._casd_process_manager = None
-        self._casd_channel = None
+        self._casd = None
         if casd:
             assert log_directory is not None, "log_directory is required when 
casd is True"
             log_dir = os.path.join(log_directory, "_casd")
-            self._casd_process_manager = CASDProcessManager(
+            self._casd = CASDProcessManager(
                 path, log_dir, log_level, cache_quota, remote_cache_spec, 
protect_session_blobs, messenger
             )
 
-            self._casd_channel = self._casd_process_manager.create_channel()
-            self._cache_usage_monitor = 
_CASCacheUsageMonitor(self._casd_channel)
+            self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd)
             self._cache_usage_monitor.start()
         else:
             assert not self._remote_cache
@@ -103,16 +101,16 @@ class CASCache:
     # Return ContentAddressableStorage stub for buildbox-casd channel.
     #
     def get_cas(self):
-        assert self._casd_channel, "CASCache was created without a channel"
-        return self._casd_channel.get_cas()
+        assert self._casd, "CASCache was created without buildbox-casd"
+        return self._casd.get_cas()
 
     # get_local_cas():
     #
     # Return LocalCAS stub for buildbox-casd channel.
     #
     def get_local_cas(self):
-        assert self._casd_channel, "CASCache was created without a channel"
-        return self._casd_channel.get_local_cas()
+        assert self._casd, "CASCache was created without buildbox-casd"
+        return self._casd.get_local_cas()
 
     # preflight():
     #
@@ -122,30 +120,18 @@ class CASCache:
         if not os.path.join(self.casdir, "objects"):
             raise CASCacheError("CAS repository check failed for 
'{}'".format(self.casdir))
 
-    # close_grpc_channels():
-    #
-    # Close the casd channel if it exists
-    #
-    def close_grpc_channels(self):
-        if self._casd_channel:
-            self._casd_channel.close()
-
     # release_resources():
     #
     # Release resources used by CASCache.
     #
     def release_resources(self, messenger=None):
-        if self._casd_channel:
-            self._casd_channel.request_shutdown()
-
         if self._cache_usage_monitor:
             self._cache_usage_monitor.stop()
             self._cache_usage_monitor.join()
 
-        if self._casd_process_manager:
-            self.close_grpc_channels()
-            self._casd_process_manager.release_resources(messenger)
-            self._casd_process_manager = None
+        if self._casd:
+            self._casd.release_resources(messenger)
+            self._casd = None
 
     def get_default_remote(self):
         return self._default_remote
@@ -733,16 +719,16 @@ class CASCache:
         assert not self._cache_usage_monitor_forbidden
         return self._cache_usage_monitor.get_cache_usage()
 
-    # get_casd_process_manager()
+    # get_casd()
     #
     # Get the underlying buildbox-casd process
     #
     # Returns:
     #   (subprocess.Process): The casd process that is used for the current 
cascache
     #
-    def get_casd_process_manager(self):
-        assert self._casd_process_manager is not None, "Only call this with a 
running buildbox-casd process"
-        return self._casd_process_manager
+    def get_casd(self):
+        assert self._casd is not None, "Only call this with a running 
buildbox-casd process"
+        return self._casd
 
 
 # _CASCacheUsage
@@ -783,9 +769,9 @@ class _CASCacheUsage:
 # buildbox-casd.
 #
 class _CASCacheUsageMonitor(threading.Thread):
-    def __init__(self, connection):
+    def __init__(self, casd):
         super().__init__()
-        self._connection = connection
+        self._casd = casd
         self._disk_usage = None
         self._disk_quota = None
         self._should_stop = False
@@ -797,7 +783,7 @@ class _CASCacheUsageMonitor(threading.Thread):
         self._should_stop = True
 
     def run(self):
-        local_cas = self._connection.get_local_cas()
+        local_cas = self._casd.get_local_cas()
 
         while not self._should_stop:
             try:
diff --git a/src/buildstream/_cas/casdprocessmanager.py 
b/src/buildstream/_cas/casdprocessmanager.py
index 8fa9861a5..b0f75a4a4 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -109,6 +109,16 @@ class CASDProcessManager:
                 env=self.__buildbox_casd_env(),
             )
 
+        self._casd_channel = None
+        self._bytestream = None
+        self._casd_cas = None
+        self._local_cas = None
+        self._asset_fetch = None
+        self._asset_push = None
+        self._shutdown_requested = False
+
+        self._lock = threading.Lock()
+
     def __buildbox_casd(self):
         return utils._get_host_tool_internal("buildbox-casd", 
search_subprojects_dir="buildbox")
 
@@ -259,6 +269,17 @@ class CASDProcessManager:
     # Terminate the process and release related resources.
     #
     def release_resources(self, messenger=None):
+        self._shutdown_requested = True
+        with self._lock:
+            if self._casd_channel:
+                self._asset_push = None
+                self._asset_fetch = None
+                self._local_cas = None
+                self._casd_cas = None
+                self._bytestream = None
+                self._casd_channel.close()
+                self._casd_channel = None
+
         self._terminate(messenger)
         self.process = None
         shutil.rmtree(self._socket_tempdir)
@@ -305,31 +326,6 @@ class CASDProcessManager:
                 "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: 
{}".format(return_code, self._logfile)
             )
 
-    # create_channel():
-    #
-    # Return a CASDChannel, note that the actual connection is not necessarily
-    # established until it is needed.
-    #
-    def create_channel(self):
-        return CASDChannel(self._socket_path, self._connection_string, 
self._start_time, self.process.pid)
-
-
-class CASDChannel:
-    def __init__(self, socket_path, connection_string, start_time, casd_pid):
-        self._socket_path = socket_path
-        self._connection_string = connection_string
-        self._start_time = start_time
-        self._casd_channel = None
-        self._bytestream = None
-        self._casd_cas = None
-        self._local_cas = None
-        self._asset_fetch = None
-        self._asset_push = None
-        self._casd_pid = casd_pid
-        self._shutdown_requested = False
-
-        self._lock = threading.Lock()
-
     def _establish_connection(self):
         with self._lock:
             if self._casd_channel is not None:
@@ -347,7 +343,7 @@ class CASDChannel:
 
                 # check that process is still alive
                 try:
-                    proc = psutil.Process(self._casd_pid)
+                    proc = psutil.Process(self.process.pid)
                     if proc.status() == psutil.STATUS_ZOMBIE:
                         proc.wait()
 
@@ -409,38 +405,3 @@ class CASDChannel:
         if self._casd_channel is None:
             self._establish_connection()
         return self._asset_push
-
-    # is_closed():
-    #
-    # Return whether this connection is closed or not.
-    #
-    def is_closed(self):
-        return self._casd_channel is None
-
-    # request_shutdown():
-    #
-    # Notify the channel that a shutdown of casd was requested.
-    #
-    # Thus we know that not being able to establish a connection is expected
-    # and no error will be reported in that case.
-    def request_shutdown(self) -> None:
-        self._shutdown_requested = True
-
-    # close():
-    #
-    # Close the casd channel.
-    #
-    def close(self):
-        assert self._shutdown_requested, "Please request shutdown before 
closing"
-
-        with self._lock:
-            if self.is_closed():
-                return
-
-            self._asset_push = None
-            self._asset_fetch = None
-            self._local_cas = None
-            self._casd_cas = None
-            self._bytestream = None
-            self._casd_channel.close()
-            self._casd_channel = None
diff --git a/src/buildstream/_cas/casserver.py 
b/src/buildstream/_cas/casserver.py
index 4e46b4fd6..6570df75f 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -90,10 +90,9 @@ def create_server(repo, *, enable_push, quota, index_only, 
log_level=LogLevel.Le
     handler.setFormatter(logging.Formatter(fmt="%(levelname)s: %(funcName)s: 
%(message)s"))
     logger.addHandler(handler)
 
-    casd_manager = CASDProcessManager(
+    casd = CASDProcessManager(
         os.path.abspath(repo), os.path.join(os.path.abspath(repo), "logs"), 
log_level, quota, None, False, None
     )
-    casd_channel = casd_manager.create_channel()
 
     try:
         # Use max_workers default from Python 3.5+
@@ -102,19 +101,19 @@ def create_server(repo, *, enable_push, quota, 
index_only, log_level=LogLevel.Le
 
         if not index_only:
             bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
-                _ByteStreamServicer(casd_channel, enable_push=enable_push), 
server
+                _ByteStreamServicer(casd, enable_push=enable_push), server
             )
 
             
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
-                _ContentAddressableStorageServicer(casd_channel, 
enable_push=enable_push), server
+                _ContentAddressableStorageServicer(casd, 
enable_push=enable_push), server
             )
 
         
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(_CapabilitiesServicer(),
 server)
 
         # Remote Asset API
-        
remote_asset_pb2_grpc.add_FetchServicer_to_server(_FetchServicer(casd_channel), 
server)
+        
remote_asset_pb2_grpc.add_FetchServicer_to_server(_FetchServicer(casd), server)
         if enable_push:
-            
remote_asset_pb2_grpc.add_PushServicer_to_server(_PushServicer(casd_channel), 
server)
+            
remote_asset_pb2_grpc.add_PushServicer_to_server(_PushServicer(casd), server)
 
         # Ensure we have the signal handler set for SIGTERM
         # This allows threads from GRPC to call our methods that do register
@@ -123,9 +122,7 @@ def create_server(repo, *, enable_push, quota, index_only, 
log_level=LogLevel.Le
             yield server
 
     finally:
-        casd_channel.request_shutdown()
-        casd_channel.close()
-        casd_manager.release_resources()
+        casd.release_resources()
 
 
 class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 159436914..075f25bcb 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1799,7 +1799,7 @@ class Stream:
             self._session_start_callback()
 
         self._running = True
-        status = self._scheduler.run(self.queues, 
self._context.get_cascache().get_casd_process_manager())
+        status = self._scheduler.run(self.queues, 
self._context.get_cascache().get_casd())
         self._running = False
 
         if status == SchedStatus.ERROR:
diff --git a/src/buildstream/sandbox/_sandboxbuildboxrun.py 
b/src/buildstream/sandbox/_sandboxbuildboxrun.py
index 1987ebac1..42f206459 100644
--- a/src/buildstream/sandbox/_sandboxbuildboxrun.py
+++ b/src/buildstream/sandbox/_sandboxbuildboxrun.py
@@ -87,7 +87,7 @@ class SandboxBuildBoxRun(SandboxREAPI):
 
         context = self._get_context()
         cascache = context.get_cascache()
-        casd_process_manager = cascache.get_casd_process_manager()
+        casd = cascache.get_casd()
 
         with utils._tempnamedfile() as action_file, utils._tempnamedfile() as 
result_file:
             action_file.write(action.SerializeToString())
@@ -95,7 +95,7 @@ class SandboxBuildBoxRun(SandboxREAPI):
 
             buildbox_command = [
                 self.__buildbox_run(),
-                "--remote={}".format(casd_process_manager._connection_string),
+                "--remote={}".format(casd._connection_string),
                 "--action={}".format(action_file.name),
                 "--action-result={}".format(result_file.name),
             ]
diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py
index 0c9960c1a..114957b85 100644
--- a/tests/artifactcache/pull.py
+++ b/tests/artifactcache/pull.py
@@ -203,9 +203,6 @@ def test_pull_tree(cli, tmpdir, datafiles):
             cli.remove_artifact_from_cache(project_dir, "target.bst")
 
             # Assert that we are not cached locally anymore
-            artifactcache.release_resources()
-            cas._casd_channel.request_shutdown()
-            cas.close_grpc_channels()
             assert cli.get_element_state(project_dir, "target.bst") != "cached"
 
             tree_digest = remote_execution_pb2.Digest(hash=tree_hash, 
size_bytes=tree_size)

Reply via email to