This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch tlater/casd-socket-permissions
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit e8ddfe1d755f0093305eb3b98d630db9ffaeca81
Author: Tristan Maat <[email protected]>
AuthorDate: Tue Oct 15 17:42:07 2019 +0100

    casserver.py: Move CASCache API to a smaller, local class
---
 src/buildstream/_cas/casserver.py | 225 +++++++++++++++++++++++++++++++++-----
 1 file changed, 198 insertions(+), 27 deletions(-)

diff --git a/src/buildstream/_cas/casserver.py 
b/src/buildstream/_cas/casserver.py
index c0c62b0..4f07639 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -19,6 +19,7 @@
 
 from concurrent import futures
 from enum import Enum
+from typing import Set
 import contextlib
 import logging
 import os
@@ -41,15 +42,178 @@ from .._protos.buildstream.v2 import buildstream_pb2, 
buildstream_pb2_grpc, \
     artifact_pb2, artifact_pb2_grpc, source_pb2, source_pb2_grpc
 
 from .. import utils
-from .._exceptions import CASError, CASCacheError
 
-from .cascache import CASCache
 
 # The default limit for gRPC messages is 4 MiB.
 # Limit payload to 1 MiB to leave sufficient headroom for metadata.
 _MAX_PAYLOAD_BYTES = 1024 * 1024
 
 
+# CASCache:
+#
+# A slimmed down version of `buildstream._cas.cascache.CASCache` -
+# exposes exactly the bits of interface we need to update objects on
+# access.
+#
+# Note: This class *is* somewhat specialized and doesn't exactly do
+# what `buildstream._cas.cascache.CASCache` does anymore.
+#
+# Ideally this should be supported by buildbox-casd in the future.
+#
+class CASCache:
+    def __init__(self, root: str):
+        self.root = root
+        self.casdir = os.path.join(root, "cas")
+        self.tmpdir = os.path.join(root, "tmp")
+
+    # ref_path():
+    #
+    # Get the path to a digest's file.
+    #
+    # Args:
+    #     ref - The ref of the digest.
+    #
+    # Returns:
+    #     str - The path to the digest's file.
+    #
+    def ref_path(self, ref: str) -> str:
+        return os.path.join(self.casdir, 'refs', 'heads', ref)
+
+    # object_path():
+    #
+    # Get the path to an object's file.
+    #
+    # Args:
+    #     digest - The digest of the object.
+    #
+    # Returns:
+    #     str - The path to the object's file.
+    #
+    def object_path(self, digest) -> str:
+        return os.path.join(self.casdir, 'objects', digest.hash[:2], 
digest.hash[2:])
+
+    # remove_ref():
+    #
+    # Remove a digest file.
+    #
+    # Args:
+    #     ref - The ref of the digest.
+    #
+    # Raises:
+    #     FileNotFoundError - If the ref doesn't exist.
+    def remove_ref(self, ref: str):
+        basedir = os.path.join(self.casdir, 'refs', 'heads')
+
+        os.unlink(self.ref_path(ref))
+
+        # Now remove any leading directories
+        components = list(os.path.split(ref))
+        while components:
+            components.pop()
+            refdir = os.path.join(basedir, *components)
+
+            # Break out once we reach the base
+            if refdir == basedir:
+                break
+
+            try:
+                os.rmdir(refdir)
+            except FileNotFoundError:
+                # The parent directory did not exist, but it's
+                # parent directory might still be ready to prune
+                pass
+            except OSError as e:
+                if e.errno == errno.ENOTEMPTY:
+                    # The parent directory was not empty, so we
+                    # cannot prune directories beyond this point
+                    break
+                raise
+
+    # set_ref():
+    #
+    # Create or update a ref with a new digest.
+    #
+    # Args:
+    #     ref - The ref of the digest.
+    #     tree - The digest to write.
+    #
+    def set_ref(self, ref: str, tree):
+        ref_path = self.ref_path(ref)
+
+        os.makedirs(os.path.dirname(ref_path), exist_ok=True)
+        with utils.save_file_atomic(ref_path, 'wb', tempdir=self.tmpdir) as f:
+            f.write(tree.SerializeToString())
+
+    # resolve_ref():
+    #
+    # Read a digest given its ref.
+    #
+    # Args:
+    #     ref - The ref of the digest.
+    #
+    # Returns:
+    #     remote_execution-pb2.Digest - The digest.
+    #
+    # Raises:
+    #     FileNotFoundError - If the ref doesn't exist.
+    def resolve_ref(self, ref: str):
+        digest = remote_execution_pb2.Digest()
+        with open(self.ref_path(ref), 'rb') as f:
+            digest.ParseFromString(f.read())
+        return digest
+
+    # resolve_digest():
+    #
+    # Read the directory corresponding to a digest.
+    #
+    # Args:
+    #     digest - The digest corresponding to a directory.
+    #
+    # Returns:
+    #     remote_execution_pb2.Directory - The directory.
+    #
+    # Raises:
+    #     FileNotFoundError - If the digest object doesn't exist.
+    def resolve_digest(self, digest):
+        directory = remote_execution_pb2.Directory()
+        with open(self.object_path(digest), 'rb') as f:
+            directory.ParseFromString(f.read())
+        return directory
+
+    # update_tree_mtime():
+    #
+    # Update the mtimes of all files in a tree.
+    #
+    # Args:
+    #     tree - The digest of the tree to update.
+    #
+    # Raises:
+    #     FileNotFoundError - If any of the tree's objects don't exist.
+    def update_tree_mtime(self, tree):
+        visited = set()  # type: Set[str]
+        os.utime(self.object_path(tree))
+
+        def update_directory_node(node):
+            try:
+                if node.hash in visited:
+                    return
+            except AttributeError:
+                raise Exception(type(node))
+
+            os.utime(self.object_path(node))
+            visited.add(node.hash)
+
+            directory = self.resolve_digest(node)
+            for filenode in directory.files:  # pylint: disable=no-member
+                os.utime(self.object_path(filenode.digest))
+            for dirnode in directory.directories:  # pylint: disable=no-member
+                update_directory_node(dirnode.digest)
+
+        # directory = self.resolve_digest(tree)
+        # update_directory_node(directory)
+        update_directory_node(tree)
+
+
 # LogLevel():
 #
 # Represents the buildbox-casd log level.
@@ -211,13 +375,13 @@ def create_server(repo, *, enable_push, quota, 
index_only, log_level=LogLevel.WA
     handler.setLevel(LogLevel.get_logging_equivalent(log_level))
     logger.addHandler(handler)
 
-    cas = CASCache(os.path.abspath(repo), cache_quota=quota, 
protect_session_blobs=False)
     cas_runner = CASdRunner(os.path.abspath(repo), cache_quota=quota)
     cas_runner.start_casd()
+    cas_cache = CASCache(os.path.abspath(repo))
 
     try:
-        artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs')
-        sourcedir = os.path.join(os.path.abspath(repo), 'source_protos')
+        root = os.path.abspath(repo)
+        sourcedir = os.path.join(root, 'source_protos')
 
         # Use max_workers default from Python 3.5+
         max_workers = (os.cpu_count() or 1) * 5
@@ -234,10 +398,10 @@ def create_server(repo, *, enable_push, quota, 
index_only, log_level=LogLevel.WA
             _CapabilitiesServicer(), server)
 
         buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
-            _ReferenceStorageServicer(cas, enable_push=enable_push), server)
+            _ReferenceStorageServicer(cas, cas_cache, 
enable_push=enable_push), server)
 
         artifact_pb2_grpc.add_ArtifactServiceServicer_to_server(
-            _ArtifactServicer(cas, artifactdir, update_cas=not index_only), 
server)
+            _ArtifactServicer(cas, root, cas_cache, update_cas=not 
index_only), server)
 
         source_pb2_grpc.add_SourceServiceServicer_to_server(
             _SourceServicer(sourcedir), server)
@@ -564,9 +728,10 @@ class 
_CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
 
 
 class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
-    def __init__(self, cas, *, enable_push):
+    def __init__(self, cas, cas_cache, *, enable_push):
         super().__init__()
         self.cas = cas
+        self.cas_cache = cas_cache
         self.enable_push = enable_push
         self.logger = logging.getLogger("casserver")
 
@@ -575,17 +740,17 @@ class 
_ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
         response = buildstream_pb2.GetReferenceResponse()
 
         try:
-            tree = self.cas.resolve_ref(request.key, update_mtime=True)
+            tree = self.cas_cache.resolve_ref(request.key)
             try:
-                self.cas.update_tree_mtime(tree)
+                self.cas_cache.update_tree_mtime(tree)
             except FileNotFoundError:
-                self.cas.remove(request.key)
+                self.cas_cache.remove_ref(request.key)
                 context.set_code(grpc.StatusCode.NOT_FOUND)
                 return response
 
             response.digest.hash = tree.hash
             response.digest.size_bytes = tree.size_bytes
-        except CASError:
+        except FileNotFoundError:
             context.set_code(grpc.StatusCode.NOT_FOUND)
 
         return response
@@ -599,7 +764,7 @@ class 
_ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
             return response
 
         for key in request.keys:
-            self.cas.set_ref(key, request.digest)
+            self.cas_cache.set_ref(key, request.digest)
 
         return response
 
@@ -614,10 +779,11 @@ class 
_ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
 
 class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
 
-    def __init__(self, cas, artifactdir, *, update_cas=True):
+    def __init__(self, cas, root, cas_cache, *, update_cas=True):
         super().__init__()
         self.cas = cas
-        self.artifactdir = artifactdir
+        self.cas_cache = cas_cache
+        self.artifactdir = os.path.join(root, 'artifacts', 'refs')
         self.update_cas = update_cas
         os.makedirs(artifactdir, exist_ok=True)
         self.logger = logging.getLogger("casserver")
@@ -651,20 +817,20 @@ class 
_ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
         try:
 
             if str(artifact.files):
-                self.cas.update_tree_mtime(artifact.files)
+                self.cas_cache.update_tree_mtime(artifact.files)
 
             if str(artifact.buildtree):
                 # buildtrees might not be there
                 try:
-                    self.cas.update_tree_mtime(artifact.buildtree)
+                    self.cas_cache.update_tree_mtime(artifact.buildtree)
                 except FileNotFoundError:
                     pass
 
             if str(artifact.public_data):
-                os.utime(self.cas.objpath(artifact.public_data))
+                os.utime(self.cas_cache.object_path(artifact.public_data))
 
             for log_file in artifact.logs:
-                os.utime(self.cas.objpath(log_file.digest))
+                os.utime(self.cas_cache.object_path(log_file.digest))
 
         except FileNotFoundError:
             os.unlink(artifact_path)
@@ -708,20 +874,25 @@ class 
_ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
 
     def _check_directory(self, name, digest, context):
         try:
-            directory = remote_execution_pb2.Directory()
-            with open(self.cas.objpath(digest), 'rb') as f:
-                directory.ParseFromString(f.read())
+            self.cas_cache.resolve_digest(digest)
         except FileNotFoundError:
-            self.logger.warning("Artifact %s specified but no files found 
(%s)", name, self.cas.objpath(digest))
-            context.abort(grpc.StatusCode.FAILED_PRECONDITION,
-                          "Artifact {} specified but no files 
found".format(name))
+            self.logger.warning(
+                "Artifact %s specified but no files found (%s)",
+                name,
+                self.cas_cache.object_path(digest))
+            context.abort(
+                grpc.StatusCode.FAILED_PRECONDITION,
+                "Artifact {} specified but no files found".format(name))
         except DecodeError:
-            self.logger.warning("Artifact %s specified but directory not found 
(%s)", name, self.cas.objpath(digest))
+            self.logger.warning(
+                "Artifact %s specified but directory not found (%s)",
+                name,
+                self.cas_cache.object_path(digest))
             context.abort(grpc.StatusCode.FAILED_PRECONDITION,
                           "Artifact {} specified but directory not 
found".format(name))
 
     def _check_file(self, name, digest, context):
-        if not os.path.exists(self.cas.objpath(digest)):
+        if not os.path.exists(self.cas_cache.object_path(digest)):
             context.abort(grpc.StatusCode.FAILED_PRECONDITION,
                           "Artifact {} specified but not found".format(name))
 

Reply via email to