This is an automated email from the ASF dual-hosted git repository. root pushed a commit to branch tlater/casd-socket-permissions in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 502ea5c578401794a93b8898074bbb1747c6e0ae Author: Tristan Maat <[email protected]> AuthorDate: Tue Oct 29 17:01:53 2019 +0000 casserver.py: Use FetchTree instead of directly updating mtimes --- src/buildstream/_cas/casserver.py | 92 +++++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 29 deletions(-) diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index e0063d1..5af610f 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -35,11 +35,20 @@ import grpc from google.protobuf.message import DecodeError import click -from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc +from .._protos.build.bazel.remote.execution.v2 import ( + remote_execution_pb2, + remote_execution_pb2_grpc, +) from .._protos.google.bytestream import bytestream_pb2_grpc -from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \ - artifact_pb2, artifact_pb2_grpc, source_pb2, source_pb2_grpc - +from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc +from .._protos.buildstream.v2 import ( + buildstream_pb2, + buildstream_pb2_grpc, + artifact_pb2, + artifact_pb2_grpc, + source_pb2, + source_pb2_grpc, +) from ..utils import save_file_atomic, get_host_tool @@ -57,6 +66,7 @@ class CASRemote: def __init__(self, url: str): self._url = url + self._local_cas = None self._bytestream = None self._cas = None @@ -94,6 +104,7 @@ class CASRemote: raise # Set up the RPC stubs + self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._channel) self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._channel) self._cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._channel) @@ -102,6 +113,11 @@ class CASRemote: assert self._cas is not None, "CAS stub was not initialized" return self._cas + def get_local_cas(self) -> local_cas_pb2_grpc.LocalContentAddressableStorageStub: + self._initialize_remote() + assert self._local_cas is not None, "Local CAS stub was not initialized" + return self._local_cas + def get_bytestream(self) -> bytestream_pb2_grpc.ByteStreamStub: self._initialize_remote() assert self._bytestream is not None, "Bytestream stub was not initialized" @@ -618,19 +634,20 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): self.logger.debug("GetReference") response = buildstream_pb2.GetReferenceResponse() + request = remote_execution_pb2.FindMissingBlobsRequest() + d = request.blob_digests.add() + d.CopyFrom(request.key) + try: - tree = self.cas_cache.resolve_ref(request.key) - try: - self.cas_cache.update_tree_mtime(tree) - except FileNotFoundError: + ref = self.cas.FindMissingBlobs(request) + except grpc.RpcError as err: + context.set_code(err.code()) + if err.code() == grpc.StatusCode.NOT_FOUND: self.cas_cache.remove_ref(request.key) - context.set_code(grpc.StatusCode.NOT_FOUND) - return response + return response - response.digest.hash = tree.hash - response.digest.size_bytes = tree.size_bytes - except FileNotFoundError: - context.set_code(grpc.StatusCode.NOT_FOUND) + response.digest.hash = ref.hash + response.digest.size_bytes = ref.size_bytes return response @@ -661,6 +678,7 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): def __init__(self, remote, root, cas_cache, *, update_cas=True): super().__init__() self.cas = remote.get_cas() + self.local_cas = remote.get_local_cas() self.cas_cache = cas_cache self.artifactdir = os.path.join(root, 'artifacts', 'refs') self.update_cas = update_cas @@ -677,6 +695,8 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): with open(artifact_path, 'rb') as f: artifact.ParseFromString(f.read()) + os.utime(artifact_path) + # Artifact-only servers will not have blobs on their system, # so we can't reasonably update their mtimes. Instead, we exit # early, and let the CAS server deal with its blobs. @@ -695,28 +715,42 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): try: if str(artifact.files): - self.cas_cache.update_tree_mtime(artifact.files) + request = local_cas_pb2.FetchTreeRequest() + request.root_digest.CopyFrom(artifact.files) + request.fetch_file_blobs = True + self.local_cas.FetchTree(request) if str(artifact.buildtree): - # buildtrees might not be there try: - self.cas_cache.update_tree_mtime(artifact.buildtree) - except FileNotFoundError: - pass + request = local_cas_pb2.FetchTreeRequest() + request.root_digest.CopyFrom(artifact.buildtree) + request.fetch_file_blobs = True + self.local_cas.FetchTree(request) + except grpc.RpcError as err: + # buildtrees might not be there + if err.code() != grpc.StatusCode.NOT_FOUND: + raise if str(artifact.public_data): - os.utime(self.cas_cache.object_path(artifact.public_data)) + request = remote_execution_pb2.FindMissingBlobsRequest() + d = request.blob_digests.add() + d.CopyFrom(artifact.public_data) + self.cas.FindMissingBlobs(request) + request = remote_execution_pb2.FindMissingBlobsRequest() for log_file in artifact.logs: - os.utime(self.cas_cache.object_path(log_file.digest)) - - except FileNotFoundError: - os.unlink(artifact_path) - context.abort(grpc.StatusCode.NOT_FOUND, - "Artifact files incomplete") - except DecodeError: - context.abort(grpc.StatusCode.NOT_FOUND, - "Artifact files not valid") + d = request.blob_digests.add() + d.CopyFrom(log_file.digest) + self.cas.FindMissingBlobs(request) + + except grpc.RpcError as err: + if err.code() == grpc.StatusCode.NOT_FOUND: + os.unlink(artifact_path) + context.abort(grpc.StatusCode.NOT_FOUND, + "Artifact files incomplete") + else: + context.abort(grpc.StatusCode.NOT_FOUND, + "Artifact files not valid") return artifact
