This is an automated email from the ASF dual-hosted git repository. juergbi pushed a commit to branch juerg/buildbox-asset-remote in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit c1defd0dbe8affb00951bf752a19372358802b73 Author: Jürg Billeter <[email protected]> AuthorDate: Fri May 31 14:07:52 2024 +0200 _assetcache.py: Use buildbox-casd as remote asset proxy, if supported buildbox-casd is already used as CAS proxy. Using buildbox-casd for all remote connections aims to improve robustness (e.g., consistent retry behavior) and will allow adding support for token-based authentication. --- src/buildstream/_assetcache.py | 60 ++++++++++++++++++++++++++++-------------- src/buildstream/_remote.py | 3 --- 2 files changed, 40 insertions(+), 23 deletions(-) diff --git a/src/buildstream/_assetcache.py b/src/buildstream/_assetcache.py index 6a4806341..6f9dcba7c 100644 --- a/src/buildstream/_assetcache.py +++ b/src/buildstream/_assetcache.py @@ -20,17 +20,20 @@ from typing import List, Dict, Tuple, Iterable, Optional import grpc from . import utils -from ._cas import CASRemote, CASCache +from ._cas import CASRemote, CASCache, CASDProcessManager from ._exceptions import AssetCacheError, RemoteError from ._remotespec import RemoteSpec, RemoteType from ._remote import BaseRemote from ._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc +from ._protos.build.buildgrid import local_cas_pb2 from ._protos.google.rpc import code_pb2 class AssetRemote(BaseRemote): - def __init__(self, spec): + def __init__(self, spec, casd): super().__init__(spec) + self.casd = casd + self.instance_name = None self.fetch_service = None self.push_service = None @@ -40,9 +43,24 @@ class AssetRemote(BaseRemote): super().close() def _configure_protocols(self): - # set up remote asset stubs - self.fetch_service = remote_asset_pb2_grpc.FetchStub(self.channel) - self.push_service = remote_asset_pb2_grpc.PushStub(self.channel) + local_cas = self.casd.get_local_cas() + request = local_cas_pb2.GetInstanceNameForRemotesRequest() + request.remote_asset.CopyFrom(self.spec.to_localcas_remote()) + try: + response = local_cas.GetInstanceNameForRemotes(request) + self.instance_name = response.instance_name + self.fetch_service = self.casd.get_asset_fetch() + self.push_service = self.casd.get_asset_push() + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.UNIMPLEMENTED or e.code() == grpc.StatusCode.INVALID_ARGUMENT: + # buildbox-casd is too old to support asset-only remotes. + # Fall back to direct connection. + self.instance_name = self.spec.instance_name + self.channel = self.spec.open_channel() + self.fetch_service = remote_asset_pb2_grpc.FetchStub(self.channel) + self.push_service = remote_asset_pb2_grpc.PushStub(self.channel) + else: + raise # _check(): # @@ -55,8 +73,8 @@ class AssetRemote(BaseRemote): # def _check(self): request = remote_asset_pb2.FetchBlobRequest() - if self.spec.instance_name: - request.instance_name = self.spec.instance_name + if self.instance_name: + request.instance_name = self.instance_name try: self.fetch_service.FetchBlob(request) @@ -74,8 +92,8 @@ class AssetRemote(BaseRemote): if self.spec.push: request = remote_asset_pb2.PushBlobRequest() - if self.spec.instance_name: - request.instance_name = self.spec.instance_name + if self.instance_name: + request.instance_name = self.instance_name try: self.push_service.PushBlob(request) @@ -112,8 +130,8 @@ class AssetRemote(BaseRemote): # def fetch_blob(self, uris, *, qualifiers=None): request = remote_asset_pb2.FetchBlobRequest() - if self.spec.instance_name: - request.instance_name = self.spec.instance_name + if self.instance_name: + request.instance_name = self.instance_name request.uris.extend(uris) if qualifiers: request.qualifiers.extend(qualifiers) @@ -153,8 +171,8 @@ class AssetRemote(BaseRemote): # def fetch_directory(self, uris, *, qualifiers=None): request = remote_asset_pb2.FetchDirectoryRequest() - if self.spec.instance_name: - request.instance_name = self.spec.instance_name + if self.instance_name: + request.instance_name = self.instance_name request.uris.extend(uris) if qualifiers: request.qualifiers.extend(qualifiers) @@ -194,8 +212,8 @@ class AssetRemote(BaseRemote): # def push_blob(self, uris, blob_digest, *, qualifiers=None, references_blobs=None, references_directories=None): request = remote_asset_pb2.PushBlobRequest() - if self.spec.instance_name: - request.instance_name = self.spec.instance_name + if self.instance_name: + request.instance_name = self.instance_name request.uris.extend(uris) request.blob_digest.CopyFrom(blob_digest) if qualifiers: @@ -231,8 +249,8 @@ class AssetRemote(BaseRemote): self, uris, directory_digest, *, qualifiers=None, references_blobs=None, references_directories=None ): request = remote_asset_pb2.PushDirectoryRequest() - if self.spec.instance_name: - request.instance_name = self.spec.instance_name + if self.instance_name: + request.instance_name = self.instance_name request.uris.extend(uris) request.root_directory_digest.CopyFrom(directory_digest) if qualifiers: @@ -262,14 +280,14 @@ class AssetRemote(BaseRemote): # to establish a connection to this remote at initialization time. # class RemotePair: - def __init__(self, cas: CASCache, spec: RemoteSpec): + def __init__(self, casd: CASDProcessManager, cas: CASCache, spec: RemoteSpec): self.index: Optional[AssetRemote] = None self.storage: Optional[CASRemote] = None self.error: Optional[str] = None try: if spec.remote_type in [RemoteType.INDEX, RemoteType.ALL]: - index = AssetRemote(spec) + index = AssetRemote(spec, casd) index.check() self.index = index if spec.remote_type in [RemoteType.STORAGE, RemoteType.ALL]: @@ -324,6 +342,8 @@ class AssetCache: # Hold on to the project specs self._project_specs = project_specs + casd = self.context.get_casd() + for spec in specs: # This can be called multiple times, ensure that we only try # to instantiate each remote once. @@ -331,7 +351,7 @@ class AssetCache: if spec in self._remotes: continue - remote = RemotePair(self.cas, spec) + remote = RemotePair(casd, self.cas, spec) if remote.error: self.context.messenger.warn("Failed to initialize remote {}: {}".format(spec.url, remote.error)) diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py index 453f7f2a0..32584de09 100644 --- a/src/buildstream/_remote.py +++ b/src/buildstream/_remote.py @@ -68,9 +68,6 @@ class BaseRemote: if self._initialized: return - if self.spec: - self.channel = self.spec.open_channel() - self._configure_protocols() self._initialized = True
