This is an automated email from the ASF dual-hosted git repository. tvb pushed a commit to branch tristan/remove-artifact-server in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 20bb5a346071b1c1f26d16d1058b227f2bf5598a Author: Tristan van Berkom <[email protected]> AuthorDate: Mon Oct 11 17:44:23 2021 +0900 Remove bst-artifact-server This fixes #1468 --- setup.py | 19 +- src/buildstream/_cas/casserver.py | 442 -------------------------------------- 2 files changed, 3 insertions(+), 458 deletions(-) diff --git a/setup.py b/setup.py index 4e29d76..753ec51 100755 --- a/setup.py +++ b/setup.py @@ -103,21 +103,6 @@ def list_testing_datafiles(): ##################################################### -# Conditional Checks # -##################################################### -# -# Because setuptools... there is no way to pass an option to -# the setup.py explicitly at install time. -# -# So screw it, lets just use an env var. -bst_install_entry_points = { - "console_scripts": ["bst-artifact-server = buildstream._cas.casserver:server_main"], -} - -if not os.environ.get("BST_ARTIFACTS_ONLY", ""): - bst_install_entry_points["console_scripts"] += ["bst = buildstream._frontend:cli"] - -##################################################### # Monkey-patching setuptools for performance # ##################################################### # @@ -384,7 +369,9 @@ setup( ("share/bash-completion/completions", [os.path.join("src", "buildstream", "data", "bst")]), ], install_requires=install_requires, - entry_points=bst_install_entry_points, + entry_points={ + "console_scripts": ["bst = buildstream._frontend:cli"] + }, ext_modules=cythonize( BUILD_EXTENSIONS, compiler_directives={ diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py deleted file mode 100644 index 1ea52fe..0000000 --- a/src/buildstream/_cas/casserver.py +++ /dev/null @@ -1,442 +0,0 @@ -# -# Copyright (C) 2018 Codethink Limited -# Copyright (C) 2020 Bloomberg Finance LP -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see <http://www.gnu.org/licenses/>. -# -# Authors: -# Jürg Billeter <[email protected]> - -from concurrent import futures -from enum import Enum -import contextlib -import logging -import os -import signal -import sys - -import grpc -import click - -from .._protos.build.bazel.remote.asset.v1 import remote_asset_pb2_grpc -from .. import _signals -from .._protos.build.bazel.remote.execution.v2 import ( - remote_execution_pb2, - remote_execution_pb2_grpc, -) -from .._protos.google.bytestream import bytestream_pb2_grpc -from .._protos.buildstream.v2 import ( - buildstream_pb2, - buildstream_pb2_grpc, -) - -# Note: We'd ideally like to avoid imports from the core codebase as -# much as possible, since we're expecting to eventually split this -# module off into its own project. -# -# Not enough that we'd like to duplicate code, but enough that we want -# to make it very obvious what we're using, so in this case we import -# the specific methods we'll be using. -from ..utils import save_file_atomic, _remove_path_with_parents -from .casdprocessmanager import CASDProcessManager - - -# The default limit for gRPC messages is 4 MiB. -# Limit payload to 1 MiB to leave sufficient headroom for metadata. -_MAX_PAYLOAD_BYTES = 1024 * 1024 - - -# LogLevel(): -# -# Manage log level choices using click. -# -class LogLevel(click.Choice): - # Levels(): - # - # Represents the actual buildbox-casd log level. - # - class Levels(Enum): - WARNING = "warning" - INFO = "info" - TRACE = "trace" - - def __init__(self): - super().__init__([m.lower() for m in LogLevel.Levels._member_names_]) # pylint: disable=no-member - - def convert(self, value, param, ctx) -> "LogLevel.Levels": - if isinstance(value, LogLevel.Levels): - value = value.value - - return LogLevel.Levels(super().convert(value, param, ctx)) - - @classmethod - def get_logging_equivalent(cls, level) -> int: - equivalents = { - cls.Levels.WARNING: logging.WARNING, - cls.Levels.INFO: logging.INFO, - cls.Levels.TRACE: logging.DEBUG, - } - - return equivalents[level] - - -# create_server(): -# -# Create gRPC CAS artifact server as specified in the Remote Execution API. -# -# Args: -# repo (str): Path to CAS repository -# enable_push (bool): Whether to allow blob uploads and artifact updates -# index_only (bool): Whether to store CAS blobs or only artifacts -# [email protected] -def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.Levels.WARNING): - logger = logging.getLogger("buildstream._cas.casserver") - logger.setLevel(LogLevel.get_logging_equivalent(log_level)) - handler = logging.StreamHandler(sys.stderr) - handler.setFormatter(logging.Formatter(fmt="%(levelname)s: %(funcName)s: %(message)s")) - logger.addHandler(handler) - - casd_manager = CASDProcessManager( - os.path.abspath(repo), os.path.join(os.path.abspath(repo), "logs"), log_level, quota, None, False - ) - casd_channel = casd_manager.create_channel() - - try: - root = os.path.abspath(repo) - - # Use max_workers default from Python 3.5+ - max_workers = (os.cpu_count() or 1) * 5 - server = grpc.server(futures.ThreadPoolExecutor(max_workers)) - - if not index_only: - bytestream_pb2_grpc.add_ByteStreamServicer_to_server( - _ByteStreamServicer(casd_channel, enable_push=enable_push), server - ) - - remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(casd_channel, enable_push=enable_push), server - ) - - remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(_CapabilitiesServicer(), server) - - # Remote Asset API - remote_asset_pb2_grpc.add_FetchServicer_to_server(_FetchServicer(casd_channel), server) - if enable_push: - remote_asset_pb2_grpc.add_PushServicer_to_server(_PushServicer(casd_channel), server) - - # BuildStream protocols - buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( - _ReferenceStorageServicer(casd_channel, root, enable_push=enable_push), server - ) - - # Ensure we have the signal handler set for SIGTERM - # This allows threads from GRPC to call our methods that do register - # handlers at exit. - with _signals.terminator(lambda: None): - yield server - - finally: - casd_channel.close() - casd_manager.release_resources() - - [email protected](short_help="CAS Artifact Server") [email protected]("--port", "-p", type=click.INT, required=True, help="Port number") [email protected]("--server-key", help="Private server key for TLS (PEM-encoded)") [email protected]("--server-cert", help="Public server certificate for TLS (PEM-encoded)") [email protected]("--client-certs", help="Public client certificates for TLS (PEM-encoded)") [email protected]("--enable-push", is_flag=True, help="Allow clients to upload blobs and update artifact cache") [email protected]("--quota", type=click.INT, default=10e9, show_default=True, help="Maximum disk usage in bytes") [email protected]( - "--index-only", - is_flag=True, - help='Only provide the BuildStream artifact and source services ("index"), not the CAS ("storage")', -) [email protected]("--log-level", type=LogLevel(), help="The log level to launch with", default="warning") [email protected]("repo") -def server_main(repo, port, server_key, server_cert, client_certs, enable_push, quota, index_only, log_level): - # Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception, - # properly executing cleanup code in `finally` clauses and context managers. - # This is required to terminate buildbox-casd on SIGTERM. - signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0)) - - with create_server( - repo, quota=quota, enable_push=enable_push, index_only=index_only, log_level=log_level - ) as server: - - use_tls = bool(server_key) - - if bool(server_cert) != use_tls: - click.echo("ERROR: --server-key and --server-cert are both required for TLS", err=True) - sys.exit(-1) - - if client_certs and not use_tls: - click.echo("ERROR: --client-certs can only be used with --server-key", err=True) - sys.exit(-1) - - if use_tls: - # Read public/private key pair - with open(server_key, "rb") as f: - server_key_bytes = f.read() - with open(server_cert, "rb") as f: - server_cert_bytes = f.read() - - if client_certs: - with open(client_certs, "rb") as f: - client_certs_bytes = f.read() - else: - client_certs_bytes = None - - credentials = grpc.ssl_server_credentials( - [(server_key_bytes, server_cert_bytes)], - root_certificates=client_certs_bytes, - require_client_auth=bool(client_certs), - ) - server.add_secure_port("[::]:{}".format(port), credentials) - else: - server.add_insecure_port("[::]:{}".format(port)) - - # Run artifact server - server.start() - try: - while True: - signal.pause() - finally: - server.stop(0) - - -class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): - def __init__(self, casd, *, enable_push): - super().__init__() - self.bytestream = casd.get_bytestream() - self.enable_push = enable_push - self.logger = logging.getLogger("buildstream._cas.casserver") - - def Read(self, request, context): - self.logger.debug("Reading %s", request.resource_name) - try: - ret = self.bytestream.Read(request) - except grpc.RpcError as err: - context.abort(err.code(), err.details()) - return ret - - def Write(self, request_iterator, context): - # Note that we can't easily give more information because the - # data is stuck in an iterator that will be consumed if read. - self.logger.debug("Writing data") - try: - ret = self.bytestream.Write(request_iterator) - except grpc.RpcError as err: - context.abort(err.code(), err.details()) - return ret - - -class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer): - def __init__(self, casd, *, enable_push): - super().__init__() - self.cas = casd.get_cas() - self.enable_push = enable_push - self.logger = logging.getLogger("buildstream._cas.casserver") - - def FindMissingBlobs(self, request, context): - self.logger.info("Finding '%s'", request.blob_digests) - try: - ret = self.cas.FindMissingBlobs(request) - except grpc.RpcError as err: - context.abort(err.code(), err.details()) - return ret - - def BatchReadBlobs(self, request, context): - self.logger.info("Reading '%s'", request.digests) - try: - ret = self.cas.BatchReadBlobs(request) - except grpc.RpcError as err: - context.abort(err.code(), err.details()) - return ret - - def BatchUpdateBlobs(self, request, context): - self.logger.info("Updating: '%s'", [request.digest for request in request.requests]) - try: - ret = self.cas.BatchUpdateBlobs(request) - except grpc.RpcError as err: - context.abort(err.code(), err.details()) - return ret - - -class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): - def __init__(self): - self.logger = logging.getLogger("buildstream._cas.casserver") - - def GetCapabilities(self, request, context): - self.logger.info("Retrieving capabilities") - response = remote_execution_pb2.ServerCapabilities() - - cache_capabilities = response.cache_capabilities - cache_capabilities.digest_function.append(remote_execution_pb2.DigestFunction.SHA256) - cache_capabilities.action_cache_update_capabilities.update_enabled = False - cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES - cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.SymlinkAbsolutePathStrategy.ALLOWED - - response.deprecated_api_version.major = 2 - response.low_api_version.major = 2 - response.high_api_version.major = 2 - - return response - - -class _FetchServicer(remote_asset_pb2_grpc.FetchServicer): - def __init__(self, casd): - super().__init__() - self.fetch = casd.get_asset_fetch() - self.logger = logging.getLogger("buildstream._cas.casserver") - - def FetchBlob(self, request, context): - self.logger.debug("FetchBlob '%s'", request.uris) - try: - ret = self.fetch.FetchBlob(request) - except grpc.RpcError as err: - context.abort(err.code(), err.details()) - return ret - - def FetchDirectory(self, request, context): - self.logger.debug("FetchDirectory '%s'", request.uris) - try: - ret = self.fetch.FetchDirectory(request) - except grpc.RpcError as err: - context.abort(err.code(), err.details()) - return ret - - -class _PushServicer(remote_asset_pb2_grpc.PushServicer): - def __init__(self, casd): - super().__init__() - self.push = casd.get_asset_push() - self.logger = logging.getLogger("buildstream._cas.casserver") - - def PushBlob(self, request, context): - self.logger.debug("PushBlob '%s'", request.uris) - try: - ret = self.push.PushBlob(request) - except grpc.RpcError as err: - context.abort(err.code(), err.details()) - return ret - - def PushDirectory(self, request, context): - self.logger.debug("PushDirectory '%s'", request.uris) - try: - ret = self.push.PushDirectory(request) - except grpc.RpcError as err: - context.abort(err.code(), err.details()) - return ret - - -class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): - def __init__(self, casd, cas_root, *, enable_push): - super().__init__() - self.cas = casd.get_cas() - self.root = cas_root - self.enable_push = enable_push - self.logger = logging.getLogger("buildstream._cas.casserver") - self.tmpdir = os.path.join(self.root, "tmp") - self.casdir = os.path.join(self.root, "cas") - self.refdir = os.path.join(self.casdir, "refs", "heads") - os.makedirs(self.tmpdir, exist_ok=True) - - # ref_path(): - # - # Get the path to a digest's file. - # - # Args: - # ref - The ref of the digest. - # - # Returns: - # str - The path to the digest's file. - # - def ref_path(self, ref: str) -> str: - return os.path.join(self.refdir, ref) - - # set_ref(): - # - # Create or update a ref with a new digest. - # - # Args: - # ref - The ref of the digest. - # tree - The digest to write. - # - def set_ref(self, ref: str, tree): - ref_path = self.ref_path(ref) - - os.makedirs(os.path.dirname(ref_path), exist_ok=True) - with save_file_atomic(ref_path, "wb", tempdir=self.tmpdir) as f: - f.write(tree.SerializeToString()) - - # resolve_ref(): - # - # Resolve a ref to a digest. - # - # Args: - # ref (str): The name of the ref - # - # Returns: - # (Digest): The digest stored in the ref - # - def resolve_ref(self, ref): - ref_path = self.ref_path(ref) - - with open(ref_path, "rb") as f: - os.utime(ref_path) - - digest = remote_execution_pb2.Digest() - digest.ParseFromString(f.read()) - return digest - - def GetReference(self, request, context): - self.logger.debug("'%s'", request.key) - response = buildstream_pb2.GetReferenceResponse() - - try: - digest = self.resolve_ref(request.key) - except FileNotFoundError: - with contextlib.suppress(FileNotFoundError): - _remove_path_with_parents(self.refdir, request.key) - - context.set_code(grpc.StatusCode.NOT_FOUND) - return response - - response.digest.hash = digest.hash - response.digest.size_bytes = digest.size_bytes - - return response - - def UpdateReference(self, request, context): - self.logger.debug("%s -> %s", request.keys, request.digest) - response = buildstream_pb2.UpdateReferenceResponse() - - if not self.enable_push: - context.set_code(grpc.StatusCode.PERMISSION_DENIED) - return response - - for key in request.keys: - self.set_ref(key, request.digest) - - return response - - def Status(self, request, context): - self.logger.debug("Retrieving status") - response = buildstream_pb2.StatusResponse() - - response.allow_updates = self.enable_push - - return response
