This is an automated email from the ASF dual-hosted git repository. juergbi pushed a commit to branch juerg/remote-cache-ci in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 0de2a8ab8f26b24cc7a6f48077ef77046545f811 Author: Jürg Billeter <[email protected]> AuthorDate: Tue Oct 20 09:34:29 2020 +0200 _sandboxremote.py: Make storage-service optional with remote cache --- src/buildstream/_context.py | 2 +- src/buildstream/_remotespec.py | 22 +++++-- src/buildstream/sandbox/_sandboxremote.py | 102 ++++++++++++++++-------------- 3 files changed, 72 insertions(+), 54 deletions(-) diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index 8c409ec..88ac785 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -749,4 +749,4 @@ class Context: os.environ["XDG_DATA_HOME"] = os.path.expanduser("~/.local/share") def _load_remote_execution(self, node: MappingNode) -> Optional[RemoteExecutionSpec]: - return RemoteExecutionSpec.new_from_node(node) + return RemoteExecutionSpec.new_from_node(node, remote_cache=bool(self.remote_cache_spec)) diff --git a/src/buildstream/_remotespec.py b/src/buildstream/_remotespec.py index a37698e..ea8125a 100644 --- a/src/buildstream/_remotespec.py +++ b/src/buildstream/_remotespec.py @@ -454,9 +454,11 @@ class RemoteSpec: # communicate with various components of an RE build cluster. # class RemoteExecutionSpec: - def __init__(self, exec_spec: RemoteSpec, storage_spec: RemoteSpec, action_spec: Optional[RemoteSpec]) -> None: + def __init__( + self, exec_spec: RemoteSpec, storage_spec: Optional[RemoteSpec], action_spec: Optional[RemoteSpec] + ) -> None: self.exec_spec: RemoteSpec = exec_spec - self.storage_spec: RemoteSpec = storage_spec + self.storage_spec: Optional[RemoteSpec] = storage_spec self.action_spec: Optional[RemoteSpec] = action_spec # new_from_node(): @@ -474,15 +476,25 @@ class RemoteExecutionSpec: # LoadError: If the node is malformed. # @classmethod - def new_from_node(cls, node: MappingNode, basedir: Optional[str] = None) -> "RemoteExecutionSpec": + def new_from_node( + cls, node: MappingNode, basedir: Optional[str] = None, *, remote_cache: bool = False + ) -> "RemoteExecutionSpec": node.validate_keys(["execution-service", "storage-service", "action-cache-service"]) exec_node = node.get_mapping("execution-service") - storage_node = node.get_mapping("storage-service") + if remote_cache: + storage_node = node.get_mapping("storage-service", default=None) + else: + storage_node = node.get_mapping("storage-service") action_node = node.get_mapping("action-cache-service", default=None) exec_spec = RemoteSpec.new_from_node(exec_node, basedir, remote_execution=True) - storage_spec = RemoteSpec.new_from_node(storage_node, basedir, remote_execution=True) + + storage_spec: Optional[RemoteSpec] + if storage_node: + storage_spec = RemoteSpec.new_from_node(storage_node, basedir, remote_execution=True) + else: + storage_spec = None action_spec: Optional[RemoteSpec] if action_node: diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index e0a3a2d..d1e1c8f 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -42,6 +42,8 @@ class SandboxRemote(SandboxREAPI): super().__init__(*args, **kwargs) context = self._get_context() + cascache = context.get_cascache() + specs = context.remote_execution_specs if specs is None: return @@ -51,6 +53,17 @@ class SandboxRemote(SandboxREAPI): self.action_spec = specs.action_spec self.operation_name = None + if self.storage_spec: + self.storage_remote = CASRemote(self.storage_spec, cascache) + try: + self.storage_remote.init() + except grpc.RpcError as e: + raise SandboxError( + "Failed to contact remote execution CAS endpoint at {}: {}".format(self.storage_spec.url, e) + ) from e + else: + self.storage_remote = cascache.get_default_remote() + def run_remote_command(self, channel, action_digest): # Sends an execution request to the remote execution server. # @@ -141,13 +154,13 @@ class SandboxRemote(SandboxREAPI): cascache = context.get_cascache() # Fetch the file blobs - dir_digest = vdir._get_digest() - required_blobs = cascache.required_blobs_for_directory(dir_digest) + if self.storage_spec: + dir_digest = vdir._get_digest() + required_blobs = cascache.required_blobs_for_directory(dir_digest) - local_missing_blobs = cascache.missing_blobs(required_blobs) - if local_missing_blobs: - with CASRemote(self.storage_spec, cascache) as casremote: - cascache.fetch_blobs(casremote, local_missing_blobs) + local_missing_blobs = cascache.missing_blobs(required_blobs) + if local_missing_blobs: + cascache.fetch_blobs(self.storage_remote, local_missing_blobs) def _execute_action(self, action, flags): stdout, stderr = self._get_output() @@ -159,46 +172,40 @@ class SandboxRemote(SandboxREAPI): action_digest = cascache.add_object(buffer=action.SerializeToString()) + casremote = self.storage_remote + # check action cache download and download if there action_result = self._check_action_cache(action_digest) if not action_result: - with CASRemote(self.storage_spec, cascache) as casremote: + with self._get_context().messenger.timed_activity( + "Uploading input root", element_name=self._get_element_name() + ): + # Determine blobs missing on remote try: - casremote.init() + input_root_digest = action.input_root_digest + missing_blobs = list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote)) except grpc.RpcError as e: - raise SandboxError( - "Failed to contact remote execution CAS endpoint at {}: {}".format(self.storage_spec.url, e) - ) from e + raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e - with self._get_context().messenger.timed_activity( - "Uploading input root", element_name=self._get_element_name() - ): - # Determine blobs missing on remote - try: - input_root_digest = action.input_root_digest - missing_blobs = list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote)) - except grpc.RpcError as e: - raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e - - # Check if any blobs are also missing locally (partial artifact) - # and pull them from the artifact cache. - try: - local_missing_blobs = cascache.missing_blobs(missing_blobs) - if local_missing_blobs: - artifactcache.fetch_missing_blobs(project, local_missing_blobs) - except (grpc.RpcError, BstError) as e: - raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e - - # Add command and action messages to blob list to push - missing_blobs.append(action.command_digest) - missing_blobs.append(action_digest) - - # Now, push the missing blobs to the remote. - try: - cascache.send_blobs(casremote, missing_blobs) - except grpc.RpcError as e: - raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e + # Check if any blobs are also missing locally (partial artifact) + # and pull them from the artifact cache. + try: + local_missing_blobs = cascache.missing_blobs(missing_blobs) + if local_missing_blobs: + artifactcache.fetch_missing_blobs(project, local_missing_blobs) + except (grpc.RpcError, BstError) as e: + raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e + + # Add command and action messages to blob list to push + missing_blobs.append(action.command_digest) + missing_blobs.append(action_digest) + + # Now, push the missing blobs to the remote. + try: + cascache.send_blobs(casremote, missing_blobs) + except grpc.RpcError as e: + raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e # Now request to execute the action channel = self.exec_spec.open_channel() @@ -207,17 +214,16 @@ class SandboxRemote(SandboxREAPI): action_result = self._extract_action_result(operation) # Fetch outputs - with CASRemote(self.storage_spec, cascache) as casremote: - for output_directory in action_result.output_directories: - tree_digest = output_directory.tree_digest - if tree_digest is None or not tree_digest.hash: - raise SandboxError("Output directory structure had no digest attached.") + for output_directory in action_result.output_directories: + tree_digest = output_directory.tree_digest + if tree_digest is None or not tree_digest.hash: + raise SandboxError("Output directory structure had no digest attached.") - # Now do a pull to ensure we have the full directory structure. - cascache.pull_tree(casremote, tree_digest) + # Now do a pull to ensure we have the full directory structure. + cascache.pull_tree(casremote, tree_digest) - # Fetch stdout and stderr blobs - cascache.fetch_blobs(casremote, [action_result.stdout_digest, action_result.stderr_digest]) + # Fetch stdout and stderr blobs + cascache.fetch_blobs(casremote, [action_result.stdout_digest, action_result.stderr_digest]) # Forward remote stdout and stderr if stdout:
