This is an automated email from the ASF dual-hosted git repository.

juergbi pushed a commit to branch juerg/re-proxy
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit d836828588368b19a89ba517c8864cd6a2c2c268
Author: Jürg Billeter <[email protected]>
AuthorDate: Sat Jun 22 11:57:33 2024 +0200

    _sandboxremote.py: Use buildbox-casd as remote execution proxy
    
    buildbox-casd is already used as CAS and RA proxy. Using buildbox-casd
    for all remote connections aims to improve robustness (e.g., consistent
    retry behavior) and will allow adding support for token-based
    authentication.
---
 src/buildstream/sandbox/_sandboxremote.py | 142 ++++++++++++++++++++++++------
 1 file changed, 113 insertions(+), 29 deletions(-)

diff --git a/src/buildstream/sandbox/_sandboxremote.py 
b/src/buildstream/sandbox/_sandboxremote.py
index 1e8598d6c..ef8ef7de8 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -15,19 +15,84 @@
 #        Jim MacArthur <[email protected]>
 
 import shutil
-from functools import partial
 
 import grpc
 
 from ._sandboxreapi import SandboxREAPI
 from .. import _signals
+from .._remote import BaseRemote
 from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, 
remote_execution_pb2_grpc
+from .._protos.build.buildgrid import local_cas_pb2
 from .._protos.google.rpc import code_pb2
 from .._exceptions import BstError, SandboxError
 from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
 from .._cas import CASRemote
 
 
+class ExecutionRemote(BaseRemote):
+    def __init__(self, spec, casd):
+        super().__init__(spec)
+        self.casd = casd
+        self.instance_name = None
+        self.exec_service = None
+        self.operations_service = None
+
+    def close(self):
+        self.exec_service = None
+        self.operations_service = None
+        super().close()
+
+    def _configure_protocols(self):
+        local_cas = self.casd.get_local_cas()
+        request = local_cas_pb2.GetInstanceNameForRemotesRequest()
+        self.spec.to_localcas_remote(request.execution)
+        try:
+            response = local_cas.GetInstanceNameForRemotes(request)
+            self.instance_name = response.instance_name
+            self.exec_service = self.casd.get_exec_service()
+            self.operations_service = self.casd.get_operations_service()
+        except grpc.RpcError as e:
+            if e.code() == grpc.StatusCode.UNIMPLEMENTED or e.code() == 
grpc.StatusCode.INVALID_ARGUMENT:
+                # buildbox-casd is too old to support execution service 
remotes.
+                # Fall back to direct connection.
+                self.instance_name = self.spec.instance_name
+                self.channel = self.spec.open_channel()
+                self.exec_service = 
remote_execution_pb2_grpc.ExecutionStub(self.channel)
+                self.operations_service = 
operations_pb2_grpc.OperationsStub(self.channel)
+            else:
+                raise
+
+
+class ActionCacheRemote(BaseRemote):
+    def __init__(self, spec, casd):
+        super().__init__(spec)
+        self.casd = casd
+        self.instance_name = None
+        self.ac_service = None
+
+    def close(self):
+        self.ac_service = None
+        super().close()
+
+    def _configure_protocols(self):
+        local_cas = self.casd.get_local_cas()
+        request = local_cas_pb2.GetInstanceNameForRemotesRequest()
+        self.spec.to_localcas_remote(request.action_cache)
+        try:
+            response = local_cas.GetInstanceNameForRemotes(request)
+            self.instance_name = response.instance_name
+            self.ac_service = self.casd.get_ac_service()
+        except grpc.RpcError as e:
+            if e.code() == grpc.StatusCode.UNIMPLEMENTED or e.code() == 
grpc.StatusCode.INVALID_ARGUMENT:
+                # buildbox-casd is too old to support action cache remotes.
+                # Fall back to direct connection.
+                self.instance_name = self.spec.instance_name
+                self.channel = self.spec.open_channel()
+                self.ac_service = 
remote_execution_pb2_grpc.ActionCacheStub(self.channel)
+            else:
+                raise
+
+
 # SandboxRemote()
 #
 # This isn't really a sandbox, it's a stub which sends all the sources and 
build
@@ -39,6 +104,7 @@ class SandboxRemote(SandboxREAPI):
 
         context = self._get_context()
         cascache = context.get_cascache()
+        casd = context.get_casd()
 
         specs = context.remote_execution_specs
         if specs is None:
@@ -62,15 +128,33 @@ class SandboxRemote(SandboxREAPI):
             self.own_storage_remote = False
             self.storage_remote = cascache.get_default_remote()
 
-    def run_remote_command(self, channel, action_digest):
+        self.exec_remote = ExecutionRemote(self.exec_spec, casd)
+        try:
+            self.exec_remote.init()
+        except grpc.RpcError as e:
+            raise SandboxError(
+                "Failed to contact remote execution service at {}: 
{}".format(self.exec_spec.url, e)
+            ) from e
+
+        if self.action_spec:
+            self.ac_remote = ActionCacheRemote(self.action_spec, casd)
+            try:
+                self.ac_remote.init()
+            except grpc.RpcError as e:
+                raise SandboxError(
+                    "Failed to contact action cache service at {}: 
{}".format(self.action_spec.url, e)
+                ) from e
+        else:
+            self.ac_remote = None
+
+    def run_remote_command(self, action_digest):
         # Sends an execution request to the remote execution server.
         #
         # This function blocks until it gets a response from the server.
 
-        # Try to create a communication channel to the BuildGrid server.
-        stub = remote_execution_pb2_grpc.ExecutionStub(channel)
+        stub = self.exec_remote.exec_service
         request = remote_execution_pb2.ExecuteRequest(
-            instance_name=self.exec_spec.instance_name, 
action_digest=action_digest, skip_cache_lookup=False
+            instance_name=self.exec_remote.instance_name, 
action_digest=action_digest, skip_cache_lookup=False
         )
 
         def __run_remote_command(stub, execute_request=None, 
running_operation=None):
@@ -117,7 +201,7 @@ class SandboxRemote(SandboxREAPI):
         operation = None
         with self._get_context().messenger.timed_activity(
             "Waiting for the remote build to complete", 
element_name=self._get_element_name()
-        ), _signals.terminator(partial(self.cancel_operation, channel)):
+        ), _signals.terminator(self.cancel_operation):
             operation = __run_remote_command(stub, execute_request=request)
             if operation is None:
                 return None
@@ -128,12 +212,12 @@ class SandboxRemote(SandboxREAPI):
 
         return operation
 
-    def cancel_operation(self, channel):
+    def cancel_operation(self):
         # If we don't have the name can't send request.
         if self.operation_name is None:
             return
 
-        stub = operations_pb2_grpc.OperationsStub(channel)
+        stub = self.exec_remote.operations_service
         request = 
operations_pb2.CancelOperationRequest(name=str(self.operation_name))
 
         try:
@@ -205,10 +289,8 @@ class SandboxRemote(SandboxREAPI):
                     raise SandboxError("Failed to push source directory to 
remote: {}".format(e)) from e
 
             # Now request to execute the action
-            channel = self.exec_spec.open_channel()
-            with channel:
-                operation = self.run_remote_command(channel, action_digest)
-                action_result = self._extract_action_result(operation)
+            operation = self.run_remote_command(action_digest)
+            action_result = self._extract_action_result(operation)
 
         # Fetch outputs
         for output_directory in action_result.output_directories:
@@ -243,25 +325,23 @@ class SandboxRemote(SandboxREAPI):
         #
         # Should return either the action response or None if not found, raise
         # Sandboxerror if other grpc error was raised
-        if not self.action_spec:
+        if not self.ac_remote:
             return None
 
-        channel = self.action_spec.open_channel()
-        with channel:
-            request = remote_execution_pb2.GetActionResultRequest(
-                instance_name=self.action_spec.instance_name, 
action_digest=action_digest
-            )
-            stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
-            try:
-                result = stub.GetActionResult(request)
-            except grpc.RpcError as e:
-                if e.code() != grpc.StatusCode.NOT_FOUND:
-                    raise SandboxError("Failed to query action cache: {} 
({})".format(e.code(), e.details()))
-                return None
-            else:
-                context = self._get_context()
-                context.messenger.info("Action result found in action cache", 
element_name=self._get_element_name())
-                return result
+        request = remote_execution_pb2.GetActionResultRequest(
+            instance_name=self.ac_remote.instance_name, 
action_digest=action_digest
+        )
+        stub = self.ac_remote.ac_service
+        try:
+            result = stub.GetActionResult(request)
+        except grpc.RpcError as e:
+            if e.code() != grpc.StatusCode.NOT_FOUND:
+                raise SandboxError("Failed to query action cache: {} 
({})".format(e.code(), e.details()))
+            return None
+        else:
+            context = self._get_context()
+            context.messenger.info("Action result found in action cache", 
element_name=self._get_element_name())
+            return result
 
     @staticmethod
     def _extract_action_result(operation):
@@ -288,5 +368,9 @@ class SandboxRemote(SandboxREAPI):
         return execution_response.result
 
     def _cleanup(self):
+        if self.ac_remote:
+            self.ac_remote.close()
+        if self.exec_remote:
+            self.exec_remote.close()
         if self.own_storage_remote:
             self.storage_remote.close()

Reply via email to