This is an automated email from the ASF dual-hosted git repository.

juergbi pushed a commit to branch juerg/remote-cache-ci
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 0de2a8ab8f26b24cc7a6f48077ef77046545f811
Author: Jürg Billeter <[email protected]>
AuthorDate: Tue Oct 20 09:34:29 2020 +0200

    _sandboxremote.py: Make storage-service optional with remote cache
---
 src/buildstream/_context.py               |   2 +-
 src/buildstream/_remotespec.py            |  22 +++++--
 src/buildstream/sandbox/_sandboxremote.py | 102 ++++++++++++++++--------------
 3 files changed, 72 insertions(+), 54 deletions(-)

diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index 8c409ec..88ac785 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -749,4 +749,4 @@ class Context:
             os.environ["XDG_DATA_HOME"] = os.path.expanduser("~/.local/share")
 
     def _load_remote_execution(self, node: MappingNode) -> 
Optional[RemoteExecutionSpec]:
-        return RemoteExecutionSpec.new_from_node(node)
+        return RemoteExecutionSpec.new_from_node(node, 
remote_cache=bool(self.remote_cache_spec))
diff --git a/src/buildstream/_remotespec.py b/src/buildstream/_remotespec.py
index a37698e..ea8125a 100644
--- a/src/buildstream/_remotespec.py
+++ b/src/buildstream/_remotespec.py
@@ -454,9 +454,11 @@ class RemoteSpec:
 # communicate with various components of an RE build cluster.
 #
 class RemoteExecutionSpec:
-    def __init__(self, exec_spec: RemoteSpec, storage_spec: RemoteSpec, 
action_spec: Optional[RemoteSpec]) -> None:
+    def __init__(
+        self, exec_spec: RemoteSpec, storage_spec: Optional[RemoteSpec], 
action_spec: Optional[RemoteSpec]
+    ) -> None:
         self.exec_spec: RemoteSpec = exec_spec
-        self.storage_spec: RemoteSpec = storage_spec
+        self.storage_spec: Optional[RemoteSpec] = storage_spec
         self.action_spec: Optional[RemoteSpec] = action_spec
 
     # new_from_node():
@@ -474,15 +476,25 @@ class RemoteExecutionSpec:
     #    LoadError: If the node is malformed.
     #
     @classmethod
-    def new_from_node(cls, node: MappingNode, basedir: Optional[str] = None) 
-> "RemoteExecutionSpec":
+    def new_from_node(
+        cls, node: MappingNode, basedir: Optional[str] = None, *, 
remote_cache: bool = False
+    ) -> "RemoteExecutionSpec":
         node.validate_keys(["execution-service", "storage-service", 
"action-cache-service"])
 
         exec_node = node.get_mapping("execution-service")
-        storage_node = node.get_mapping("storage-service")
+        if remote_cache:
+            storage_node = node.get_mapping("storage-service", default=None)
+        else:
+            storage_node = node.get_mapping("storage-service")
         action_node = node.get_mapping("action-cache-service", default=None)
 
         exec_spec = RemoteSpec.new_from_node(exec_node, basedir, 
remote_execution=True)
-        storage_spec = RemoteSpec.new_from_node(storage_node, basedir, 
remote_execution=True)
+
+        storage_spec: Optional[RemoteSpec]
+        if storage_node:
+            storage_spec = RemoteSpec.new_from_node(storage_node, basedir, 
remote_execution=True)
+        else:
+            storage_spec = None
 
         action_spec: Optional[RemoteSpec]
         if action_node:
diff --git a/src/buildstream/sandbox/_sandboxremote.py 
b/src/buildstream/sandbox/_sandboxremote.py
index e0a3a2d..d1e1c8f 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -42,6 +42,8 @@ class SandboxRemote(SandboxREAPI):
         super().__init__(*args, **kwargs)
 
         context = self._get_context()
+        cascache = context.get_cascache()
+
         specs = context.remote_execution_specs
         if specs is None:
             return
@@ -51,6 +53,17 @@ class SandboxRemote(SandboxREAPI):
         self.action_spec = specs.action_spec
         self.operation_name = None
 
+        if self.storage_spec:
+            self.storage_remote = CASRemote(self.storage_spec, cascache)
+            try:
+                self.storage_remote.init()
+            except grpc.RpcError as e:
+                raise SandboxError(
+                    "Failed to contact remote execution CAS endpoint at {}: 
{}".format(self.storage_spec.url, e)
+                ) from e
+        else:
+            self.storage_remote = cascache.get_default_remote()
+
     def run_remote_command(self, channel, action_digest):
         # Sends an execution request to the remote execution server.
         #
@@ -141,13 +154,13 @@ class SandboxRemote(SandboxREAPI):
         cascache = context.get_cascache()
 
         # Fetch the file blobs
-        dir_digest = vdir._get_digest()
-        required_blobs = cascache.required_blobs_for_directory(dir_digest)
+        if self.storage_spec:
+            dir_digest = vdir._get_digest()
+            required_blobs = cascache.required_blobs_for_directory(dir_digest)
 
-        local_missing_blobs = cascache.missing_blobs(required_blobs)
-        if local_missing_blobs:
-            with CASRemote(self.storage_spec, cascache) as casremote:
-                cascache.fetch_blobs(casremote, local_missing_blobs)
+            local_missing_blobs = cascache.missing_blobs(required_blobs)
+            if local_missing_blobs:
+                cascache.fetch_blobs(self.storage_remote, local_missing_blobs)
 
     def _execute_action(self, action, flags):
         stdout, stderr = self._get_output()
@@ -159,46 +172,40 @@ class SandboxRemote(SandboxREAPI):
 
         action_digest = cascache.add_object(buffer=action.SerializeToString())
 
+        casremote = self.storage_remote
+
         # check action cache download and download if there
         action_result = self._check_action_cache(action_digest)
 
         if not action_result:
-            with CASRemote(self.storage_spec, cascache) as casremote:
+            with self._get_context().messenger.timed_activity(
+                "Uploading input root", element_name=self._get_element_name()
+            ):
+                # Determine blobs missing on remote
                 try:
-                    casremote.init()
+                    input_root_digest = action.input_root_digest
+                    missing_blobs = 
list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote))
                 except grpc.RpcError as e:
-                    raise SandboxError(
-                        "Failed to contact remote execution CAS endpoint at 
{}: {}".format(self.storage_spec.url, e)
-                    ) from e
+                    raise SandboxError("Failed to determine missing blobs: 
{}".format(e)) from e
 
-                with self._get_context().messenger.timed_activity(
-                    "Uploading input root", 
element_name=self._get_element_name()
-                ):
-                    # Determine blobs missing on remote
-                    try:
-                        input_root_digest = action.input_root_digest
-                        missing_blobs = 
list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote))
-                    except grpc.RpcError as e:
-                        raise SandboxError("Failed to determine missing blobs: 
{}".format(e)) from e
-
-                    # Check if any blobs are also missing locally (partial 
artifact)
-                    # and pull them from the artifact cache.
-                    try:
-                        local_missing_blobs = 
cascache.missing_blobs(missing_blobs)
-                        if local_missing_blobs:
-                            artifactcache.fetch_missing_blobs(project, 
local_missing_blobs)
-                    except (grpc.RpcError, BstError) as e:
-                        raise SandboxError("Failed to pull missing blobs from 
artifact cache: {}".format(e)) from e
-
-                    # Add command and action messages to blob list to push
-                    missing_blobs.append(action.command_digest)
-                    missing_blobs.append(action_digest)
-
-                    # Now, push the missing blobs to the remote.
-                    try:
-                        cascache.send_blobs(casremote, missing_blobs)
-                    except grpc.RpcError as e:
-                        raise SandboxError("Failed to push source directory to 
remote: {}".format(e)) from e
+                # Check if any blobs are also missing locally (partial 
artifact)
+                # and pull them from the artifact cache.
+                try:
+                    local_missing_blobs = cascache.missing_blobs(missing_blobs)
+                    if local_missing_blobs:
+                        artifactcache.fetch_missing_blobs(project, 
local_missing_blobs)
+                except (grpc.RpcError, BstError) as e:
+                    raise SandboxError("Failed to pull missing blobs from 
artifact cache: {}".format(e)) from e
+
+                # Add command and action messages to blob list to push
+                missing_blobs.append(action.command_digest)
+                missing_blobs.append(action_digest)
+
+                # Now, push the missing blobs to the remote.
+                try:
+                    cascache.send_blobs(casremote, missing_blobs)
+                except grpc.RpcError as e:
+                    raise SandboxError("Failed to push source directory to 
remote: {}".format(e)) from e
 
             # Now request to execute the action
             channel = self.exec_spec.open_channel()
@@ -207,17 +214,16 @@ class SandboxRemote(SandboxREAPI):
                 action_result = self._extract_action_result(operation)
 
         # Fetch outputs
-        with CASRemote(self.storage_spec, cascache) as casremote:
-            for output_directory in action_result.output_directories:
-                tree_digest = output_directory.tree_digest
-                if tree_digest is None or not tree_digest.hash:
-                    raise SandboxError("Output directory structure had no 
digest attached.")
+        for output_directory in action_result.output_directories:
+            tree_digest = output_directory.tree_digest
+            if tree_digest is None or not tree_digest.hash:
+                raise SandboxError("Output directory structure had no digest 
attached.")
 
-                # Now do a pull to ensure we have the full directory structure.
-                cascache.pull_tree(casremote, tree_digest)
+            # Now do a pull to ensure we have the full directory structure.
+            cascache.pull_tree(casremote, tree_digest)
 
-            # Fetch stdout and stderr blobs
-            cascache.fetch_blobs(casremote, [action_result.stdout_digest, 
action_result.stderr_digest])
+        # Fetch stdout and stderr blobs
+        cascache.fetch_blobs(casremote, [action_result.stdout_digest, 
action_result.stderr_digest])
 
         # Forward remote stdout and stderr
         if stdout:

Reply via email to