This is an automated email from the ASF dual-hosted git repository. root pushed a commit to branch testing/local-cache-expiry in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 9e9dad60a302c2d51d736af8621d18c93416222e Author: Tristan Maat <[email protected]> AuthorDate: Sat Mar 24 17:56:56 2018 +0000 Automatically delete artifacts when we run out of space --- buildstream/_artifactcache/artifactcache.py | 47 ++++++++++++++++++- buildstream/_artifactcache/ostreecache.py | 18 +++++++- buildstream/_artifactcache/tarcache.py | 31 +++++++++++++ buildstream/_exceptions.py | 4 +- buildstream/_ostree.py | 2 + buildstream/_scheduler/jobs/__init__.py | 1 + buildstream/_scheduler/jobs/cleanupjob.py | 71 +++++++++++++++++++++++++++++ buildstream/_scheduler/jobs/job.py | 1 + buildstream/_scheduler/queues/buildqueue.py | 2 +- buildstream/_scheduler/scheduler.py | 1 + buildstream/element.py | 2 +- buildstream/utils.py | 14 +++--- 12 files changed, 181 insertions(+), 13 deletions(-) diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index 3b2aaf5..a7bf679 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -21,7 +21,7 @@ import os import string from collections import Mapping, namedtuple -from .._exceptions import ImplError, LoadError, LoadErrorReason +from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason from .._message import Message, MessageType from .. import utils from .. import _yaml @@ -165,6 +165,39 @@ class ArtifactCache(): (str(provenance))) return cache_specs + # clean(): + # + # Clean the artifact cache as much as possible. + # + def clean(self): + artifacts = self.list_artifacts() + + while self.calculate_cache_size() >= self.context.cache_quota - self.context.cache_lower_threshold: + try: + to_remove = artifacts.pop(0) + except IndexError: + # If too many artifacts are required, and we therefore + # can't remove them, we have to abort the build. + # + # FIXME: Asking the user what to do may be neater + default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], + 'buildstream.conf') + detail = ("There is not enough space to build the given element.\n" + "Please increase the cache-quota in {}." + .format(self.context.config_origin or default_conf)) + + if self.calculate_cache_size() > self.context.cache_quota: + raise ArtifactError("Cache too full. Aborting.", + detail=detail, + reason="cache-too-full") + else: + break + + self.remove(to_remove) + + # This should be O(1) if implemented correctly + return self.calculate_cache_size() + # get_approximate_cache_size() # # A cheap method that aims to serve as an upper limit on the @@ -223,6 +256,18 @@ class ArtifactCache(): raise ImplError("Cache '{kind}' does not implement contains()" .format(kind=type(self).__name__)) + # list_artifacts(): + # + # List artifacts in this cache in LRU order. + # + # Returns: + # ([str]) - A list of artifact names as generated by + # `ArtifactCache.get_artifact_fullname` in LRU order + # + def list_artifacts(self): + raise ImplError("Cache '{kind}' does not implement list_artifacts()" + .format(kind=type(self).__name__)) + # remove(): # # Removes the artifact for the specified ref from the local diff --git a/buildstream/_artifactcache/ostreecache.py b/buildstream/_artifactcache/ostreecache.py index 8d22917..e0bbc3c 100755 --- a/buildstream/_artifactcache/ostreecache.py +++ b/buildstream/_artifactcache/ostreecache.py @@ -93,8 +93,15 @@ class OSTreeCache(ArtifactCache): ref = self.get_artifact_fullname(element, key) return _ostree.exists(self.repo, ref) - def remove(self, ref): - return _ostree.remove(self.repo, ref) + def list_artifacts(self): + return _ostree.list_artifacts(self.repo) + + def remove(self, artifact_name): + # We cannot defer pruning, unfortunately, because we could + # otherwise not figure out how much space was freed by the + # removal, and would therefore not be able to expire the + # correct number of artifacts. + self.cache_size -= _ostree.remove(self.repo, artifact_name, defer_prune=False) def extract(self, element, key): ref = self.get_artifact_fullname(element, key) @@ -105,6 +112,9 @@ class OSTreeCache(ArtifactCache): # Extracting a nonexistent artifact is a bug assert rev, "Artifact missing for {}".format(ref) + ref_file = os.path.join(self.repo.get_path().get_path(), 'refs', 'heads', ref) + os.utime(ref_file) + dest = os.path.join(self.extractdir, element._get_project().name, element.normal_name, rev) if os.path.isdir(dest): # artifact has already been extracted @@ -140,6 +150,10 @@ class OSTreeCache(ArtifactCache): except OSTreeError as e: raise ArtifactError("Failed to commit artifact: {}".format(e)) from e + for ref in refs: + ref_file = os.path.join(self.repo.get_path().get_path(), 'refs', 'heads', ref) + os.utime(ref_file) + self.cache_size = None def can_diff(self): diff --git a/buildstream/_artifactcache/tarcache.py b/buildstream/_artifactcache/tarcache.py index 3cae9d9..ad41e42 100644 --- a/buildstream/_artifactcache/tarcache.py +++ b/buildstream/_artifactcache/tarcache.py @@ -45,6 +45,24 @@ class TarCache(ArtifactCache): path = os.path.join(self.tardir, _tarpath(element, key)) return os.path.isfile(path) + # list_artifacts(): + # + # List artifacts in this cache in LRU order. + # + # Returns: + # (list) - A list of refs in LRU order + # + def list_artifacts(self): + artifacts = list(utils.list_relative_paths(self.tardir, list_dirs=False)) + mtimes = [os.path.getmtime(os.path.join(self.tardir, artifact)) + for artifact in artifacts if artifact] + + # We need to get rid of the tarfile extension to get a proper + # ref - os.splitext doesn't do this properly, unfortunately. + artifacts = [artifact[:-len('.tar.bz2')] for artifact in artifacts] + + return [name for _, name in sorted(zip(mtimes, artifacts))] + # remove() # # Implements artifactcache.remove(). @@ -68,6 +86,19 @@ class TarCache(ArtifactCache): _Tar.archive(os.path.join(self.tardir, ref), key, temp) self.cache_size = None + os.utime(os.path.join(self.tardir, ref)) + + # update_atime(): + # + # Update the access time of an element. + # + # Args: + # element (Element): The Element to mark + # key (str): The cache key to use + # + def update_atime(self, element, key): + path = _tarpath(element, key) + os.utime(os.path.join(self.tardir, path)) def extract(self, element, key): diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py index 3aadd52..96d634b 100644 --- a/buildstream/_exceptions.py +++ b/buildstream/_exceptions.py @@ -246,8 +246,8 @@ class SandboxError(BstError): # Raised when errors are encountered in the artifact caches # class ArtifactError(BstError): - def __init__(self, message, reason=None): - super().__init__(message, domain=ErrorDomain.ARTIFACT, reason=reason) + def __init__(self, message, *, detail=None, reason=None): + super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason) # PipelineError diff --git a/buildstream/_ostree.py b/buildstream/_ostree.py index 238c6b4..d060960 100644 --- a/buildstream/_ostree.py +++ b/buildstream/_ostree.py @@ -565,6 +565,8 @@ def list_artifacts(repo): ref_heads = os.path.join(repo.get_path().get_path(), 'refs', 'heads') # obtain list of <project>/<element>/<key> + # FIXME: ostree 2017.11+ supports a flag that would allow + # listing only local refs refs = _list_all_refs(repo).keys() mtimes = [] diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py index 9815586..185d825 100644 --- a/buildstream/_scheduler/jobs/__init__.py +++ b/buildstream/_scheduler/jobs/__init__.py @@ -1,2 +1,3 @@ from .elementjob import ElementJob from .cachesizejob import CacheSizeJob +from .cleanupjob import CleanupJob diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py new file mode 100644 index 0000000..3ae635a --- /dev/null +++ b/buildstream/_scheduler/jobs/cleanupjob.py @@ -0,0 +1,71 @@ +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Author: +# Tristan Daniƫl Maat <[email protected]> +# +import os +from contextlib import contextmanager + +from .job import Job +from ..._platform import Platform +from ..._message import Message + + +class CleanupJob(Job): + def __init__(self, *args, complete_cb, **kwargs): + super().__init__(*args, **kwargs) + self._complete_cb = complete_cb + self._cache = Platform._instance.artifactcache + + def _child_process(self): + return self._cache.clean() + + def _parent_complete(self, success, result): + self._cache._set_cache_size(result) + self._complete_cb() + + @contextmanager + def _child_logging_enabled(self, logfile): + self._logfile = logfile.format(pid=os.getpid()) + yield self._logfile + self._logfile = None + + # _message(): + # + # Sends a message to the frontend + # + # Args: + # message_type (MessageType): The type of message to send + # message (str): The message + # kwargs: Remaining Message() constructor arguments + # + def _message(self, message_type, message, **kwargs): + args = dict(kwargs) + args['scheduler'] = True + self._scheduler.context.message(Message(None, message_type, message, **args)) + + def _child_log(self, message): + message.action_name = self.action_name + + with open(self._logfile, 'a+') as log: + message_text = self._format_frontend_message(message, '[cleanup]') + log.write('{}\n'.format(message_text)) + log.flush() + + return message + + def _child_process_data(self): + return {} diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index fb25901..01b7107 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -59,6 +59,7 @@ class JobType(): PULL = 4 PUSH = 5 SIZE = 6 + CLEAN = 7 # Job() diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index c023973..a7e8e32 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -20,7 +20,7 @@ import os from . import Queue, QueueStatus, QueueType -from ..jobs import CacheSizeJob, JobType +from ..jobs import CacheSizeJob, CleanupJob, JobType # A queue which assembles elements diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index c33c9d1..ec73620 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -146,6 +146,7 @@ class Scheduler(): self._starttime = start_time self._suspendtime = None self._queue_jobs = True # Whether we should continue to queue jobs + self._start_cleanup = False # Whether we would like to launch a cleanup job # Initialize task tokens with the number allowed by # the user configuration diff --git a/buildstream/element.py b/buildstream/element.py index 98a39a4..c5b62e4 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -1525,8 +1525,8 @@ class Element(Plugin): }), os.path.join(metadir, 'workspaced-dependencies.yaml')) with self.timed_activity("Caching artifact"): - self.__artifacts._commit(self, assembledir, self.__get_cache_keys_for_commit()) self.__artifact_size = utils._get_dir_size(assembledir) + self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit()) # Finally cleanup the build dir cleanup_rootdir() diff --git a/buildstream/utils.py b/buildstream/utils.py index 1cdf575..e8270d8 100644 --- a/buildstream/utils.py +++ b/buildstream/utils.py @@ -96,7 +96,7 @@ class FileListResult(): return ret -def list_relative_paths(directory): +def list_relative_paths(directory, *, list_dirs=True): """A generator for walking directory relative paths This generator is useful for checking the full manifest of @@ -110,6 +110,7 @@ def list_relative_paths(directory): Args: directory (str): The directory to list files in + list_dirs (bool): Whether to list directories Yields: Relative filenames in `directory` @@ -136,15 +137,16 @@ def list_relative_paths(directory): # subdirectories in the walked `dirpath`, so we extract # these symlinks from `dirnames` # - for d in dirnames: - fullpath = os.path.join(dirpath, d) - if os.path.islink(fullpath): - yield os.path.join(basepath, d) + if list_dirs: + for d in dirnames: + fullpath = os.path.join(dirpath, d) + if os.path.islink(fullpath): + yield os.path.join(basepath, d) # We've decended into an empty directory, in this case we # want to include the directory itself, but not in any other # case. - if not filenames: + if list_dirs and not filenames: yield relpath # List the filenames in the walked directory
