This is an automated email from the ASF dual-hosted git repository. juergbi pushed a commit to branch juerg/buildbox-asset-remote in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 6e947605d6af1e55f3c4d77db5553713755c67e9 Author: Jürg Billeter <[email protected]> AuthorDate: Fri May 31 12:32:18 2024 +0200 _cas: Merge `CASDChannel` into `CASDProcessManager` There is only ever a single instance of `CASDChannel` and it's always created right after creating `CASDProcessManager`. This is in preparation of using buildbox-casd outside of `CASCache`. --- src/buildstream/_cas/cascache.py | 48 ++++++--------- src/buildstream/_cas/casdprocessmanager.py | 83 +++++++------------------- src/buildstream/_cas/casserver.py | 15 ++--- src/buildstream/_stream.py | 2 +- src/buildstream/sandbox/_sandboxbuildboxrun.py | 4 +- tests/artifactcache/pull.py | 3 - 6 files changed, 48 insertions(+), 107 deletions(-) diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index 42937c1d6..8110337bc 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -80,17 +80,15 @@ class CASCache: self._remote_cache = bool(remote_cache_spec) - self._casd_process_manager = None - self._casd_channel = None + self._casd = None if casd: assert log_directory is not None, "log_directory is required when casd is True" log_dir = os.path.join(log_directory, "_casd") - self._casd_process_manager = CASDProcessManager( + self._casd = CASDProcessManager( path, log_dir, log_level, cache_quota, remote_cache_spec, protect_session_blobs, messenger ) - self._casd_channel = self._casd_process_manager.create_channel() - self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd_channel) + self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd) self._cache_usage_monitor.start() else: assert not self._remote_cache @@ -103,16 +101,16 @@ class CASCache: # Return ContentAddressableStorage stub for buildbox-casd channel. # def get_cas(self): - assert self._casd_channel, "CASCache was created without a channel" - return self._casd_channel.get_cas() + assert self._casd, "CASCache was created without buildbox-casd" + return self._casd.get_cas() # get_local_cas(): # # Return LocalCAS stub for buildbox-casd channel. # def get_local_cas(self): - assert self._casd_channel, "CASCache was created without a channel" - return self._casd_channel.get_local_cas() + assert self._casd, "CASCache was created without buildbox-casd" + return self._casd.get_local_cas() # preflight(): # @@ -122,30 +120,18 @@ class CASCache: if not os.path.join(self.casdir, "objects"): raise CASCacheError("CAS repository check failed for '{}'".format(self.casdir)) - # close_grpc_channels(): - # - # Close the casd channel if it exists - # - def close_grpc_channels(self): - if self._casd_channel: - self._casd_channel.close() - # release_resources(): # # Release resources used by CASCache. # def release_resources(self, messenger=None): - if self._casd_channel: - self._casd_channel.request_shutdown() - if self._cache_usage_monitor: self._cache_usage_monitor.stop() self._cache_usage_monitor.join() - if self._casd_process_manager: - self.close_grpc_channels() - self._casd_process_manager.release_resources(messenger) - self._casd_process_manager = None + if self._casd: + self._casd.release_resources(messenger) + self._casd = None def get_default_remote(self): return self._default_remote @@ -733,16 +719,16 @@ class CASCache: assert not self._cache_usage_monitor_forbidden return self._cache_usage_monitor.get_cache_usage() - # get_casd_process_manager() + # get_casd() # # Get the underlying buildbox-casd process # # Returns: # (subprocess.Process): The casd process that is used for the current cascache # - def get_casd_process_manager(self): - assert self._casd_process_manager is not None, "Only call this with a running buildbox-casd process" - return self._casd_process_manager + def get_casd(self): + assert self._casd is not None, "Only call this with a running buildbox-casd process" + return self._casd # _CASCacheUsage @@ -783,9 +769,9 @@ class _CASCacheUsage: # buildbox-casd. # class _CASCacheUsageMonitor(threading.Thread): - def __init__(self, connection): + def __init__(self, casd): super().__init__() - self._connection = connection + self._casd = casd self._disk_usage = None self._disk_quota = None self._should_stop = False @@ -797,7 +783,7 @@ class _CASCacheUsageMonitor(threading.Thread): self._should_stop = True def run(self): - local_cas = self._connection.get_local_cas() + local_cas = self._casd.get_local_cas() while not self._should_stop: try: diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py index 8fa9861a5..b0f75a4a4 100644 --- a/src/buildstream/_cas/casdprocessmanager.py +++ b/src/buildstream/_cas/casdprocessmanager.py @@ -109,6 +109,16 @@ class CASDProcessManager: env=self.__buildbox_casd_env(), ) + self._casd_channel = None + self._bytestream = None + self._casd_cas = None + self._local_cas = None + self._asset_fetch = None + self._asset_push = None + self._shutdown_requested = False + + self._lock = threading.Lock() + def __buildbox_casd(self): return utils._get_host_tool_internal("buildbox-casd", search_subprojects_dir="buildbox") @@ -259,6 +269,17 @@ class CASDProcessManager: # Terminate the process and release related resources. # def release_resources(self, messenger=None): + self._shutdown_requested = True + with self._lock: + if self._casd_channel: + self._asset_push = None + self._asset_fetch = None + self._local_cas = None + self._casd_cas = None + self._bytestream = None + self._casd_channel.close() + self._casd_channel = None + self._terminate(messenger) self.process = None shutil.rmtree(self._socket_tempdir) @@ -305,31 +326,6 @@ class CASDProcessManager: "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(return_code, self._logfile) ) - # create_channel(): - # - # Return a CASDChannel, note that the actual connection is not necessarily - # established until it is needed. - # - def create_channel(self): - return CASDChannel(self._socket_path, self._connection_string, self._start_time, self.process.pid) - - -class CASDChannel: - def __init__(self, socket_path, connection_string, start_time, casd_pid): - self._socket_path = socket_path - self._connection_string = connection_string - self._start_time = start_time - self._casd_channel = None - self._bytestream = None - self._casd_cas = None - self._local_cas = None - self._asset_fetch = None - self._asset_push = None - self._casd_pid = casd_pid - self._shutdown_requested = False - - self._lock = threading.Lock() - def _establish_connection(self): with self._lock: if self._casd_channel is not None: @@ -347,7 +343,7 @@ class CASDChannel: # check that process is still alive try: - proc = psutil.Process(self._casd_pid) + proc = psutil.Process(self.process.pid) if proc.status() == psutil.STATUS_ZOMBIE: proc.wait() @@ -409,38 +405,3 @@ class CASDChannel: if self._casd_channel is None: self._establish_connection() return self._asset_push - - # is_closed(): - # - # Return whether this connection is closed or not. - # - def is_closed(self): - return self._casd_channel is None - - # request_shutdown(): - # - # Notify the channel that a shutdown of casd was requested. - # - # Thus we know that not being able to establish a connection is expected - # and no error will be reported in that case. - def request_shutdown(self) -> None: - self._shutdown_requested = True - - # close(): - # - # Close the casd channel. - # - def close(self): - assert self._shutdown_requested, "Please request shutdown before closing" - - with self._lock: - if self.is_closed(): - return - - self._asset_push = None - self._asset_fetch = None - self._local_cas = None - self._casd_cas = None - self._bytestream = None - self._casd_channel.close() - self._casd_channel = None diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index 4e46b4fd6..6570df75f 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -90,10 +90,9 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.Le handler.setFormatter(logging.Formatter(fmt="%(levelname)s: %(funcName)s: %(message)s")) logger.addHandler(handler) - casd_manager = CASDProcessManager( + casd = CASDProcessManager( os.path.abspath(repo), os.path.join(os.path.abspath(repo), "logs"), log_level, quota, None, False, None ) - casd_channel = casd_manager.create_channel() try: # Use max_workers default from Python 3.5+ @@ -102,19 +101,19 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.Le if not index_only: bytestream_pb2_grpc.add_ByteStreamServicer_to_server( - _ByteStreamServicer(casd_channel, enable_push=enable_push), server + _ByteStreamServicer(casd, enable_push=enable_push), server ) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(casd_channel, enable_push=enable_push), server + _ContentAddressableStorageServicer(casd, enable_push=enable_push), server ) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(_CapabilitiesServicer(), server) # Remote Asset API - remote_asset_pb2_grpc.add_FetchServicer_to_server(_FetchServicer(casd_channel), server) + remote_asset_pb2_grpc.add_FetchServicer_to_server(_FetchServicer(casd), server) if enable_push: - remote_asset_pb2_grpc.add_PushServicer_to_server(_PushServicer(casd_channel), server) + remote_asset_pb2_grpc.add_PushServicer_to_server(_PushServicer(casd), server) # Ensure we have the signal handler set for SIGTERM # This allows threads from GRPC to call our methods that do register @@ -123,9 +122,7 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.Le yield server finally: - casd_channel.request_shutdown() - casd_channel.close() - casd_manager.release_resources() + casd.release_resources() class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 159436914..075f25bcb 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1799,7 +1799,7 @@ class Stream: self._session_start_callback() self._running = True - status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager()) + status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd()) self._running = False if status == SchedStatus.ERROR: diff --git a/src/buildstream/sandbox/_sandboxbuildboxrun.py b/src/buildstream/sandbox/_sandboxbuildboxrun.py index 1987ebac1..42f206459 100644 --- a/src/buildstream/sandbox/_sandboxbuildboxrun.py +++ b/src/buildstream/sandbox/_sandboxbuildboxrun.py @@ -87,7 +87,7 @@ class SandboxBuildBoxRun(SandboxREAPI): context = self._get_context() cascache = context.get_cascache() - casd_process_manager = cascache.get_casd_process_manager() + casd = cascache.get_casd() with utils._tempnamedfile() as action_file, utils._tempnamedfile() as result_file: action_file.write(action.SerializeToString()) @@ -95,7 +95,7 @@ class SandboxBuildBoxRun(SandboxREAPI): buildbox_command = [ self.__buildbox_run(), - "--remote={}".format(casd_process_manager._connection_string), + "--remote={}".format(casd._connection_string), "--action={}".format(action_file.name), "--action-result={}".format(result_file.name), ] diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py index 0c9960c1a..114957b85 100644 --- a/tests/artifactcache/pull.py +++ b/tests/artifactcache/pull.py @@ -203,9 +203,6 @@ def test_pull_tree(cli, tmpdir, datafiles): cli.remove_artifact_from_cache(project_dir, "target.bst") # Assert that we are not cached locally anymore - artifactcache.release_resources() - cas._casd_channel.request_shutdown() - cas.close_grpc_channels() assert cli.get_element_state(project_dir, "target.bst") != "cached" tree_digest = remote_execution_pb2.Digest(hash=tree_hash, size_bytes=tree_size)
