This is an automated email from the ASF dual-hosted git repository. root pushed a commit to branch testing/local-cache-expiry in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit e1cf61345851a5aeb5981b1f70c2ab1434eded34 Author: Tristan Maat <[email protected]> AuthorDate: Wed Mar 28 15:03:42 2018 +0000 Don't expire artifacts that are required for the pipeline --- buildstream/_artifactcache/artifactcache.py | 50 ++++++++++++++++++++++++++++- buildstream/_artifactcache/ostreecache.py | 12 +++++-- buildstream/_artifactcache/tarcache.py | 2 +- buildstream/_pipeline.py | 2 ++ buildstream/_scheduler/queues/buildqueue.py | 2 ++ buildstream/element.py | 10 ++++++ 6 files changed, 73 insertions(+), 5 deletions(-) diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py old mode 100644 new mode 100755 index a7bf679..bf8ff4a --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -21,6 +21,7 @@ import os import string from collections import Mapping, namedtuple +from ..element import _KeyStrength from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason from .._message import Message, MessageType from .. import utils @@ -61,6 +62,7 @@ class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push')): class ArtifactCache(): def __init__(self, context): self.context = context + self.required_artifacts = set() self.extractdir = os.path.join(context.artifactdir, 'extract') self.max_size = context.cache_quota self.estimated_size = None @@ -165,6 +167,38 @@ class ArtifactCache(): (str(provenance))) return cache_specs + # append_required_artifacts(): + # + # Append to the list of elements whose artifacts are required for + # the current run. Artifacts whose elements are in this list will + # be locked by the artifact cache and not touched for the duration + # of the current pipeline. + # + # Args: + # elements (iterable): A set of elements to mark as required + # + def append_required_artifacts(self, elements): + # We lock both strong and weak keys - deleting one but not the + # other won't save space in most cases anyway, but would be a + # user inconvenience. + + for element in elements: + strong_key = element._get_cache_key(strength=_KeyStrength.STRONG) + weak_key = element._get_cache_key(strength=_KeyStrength.WEAK) + + for key in (strong_key, weak_key): + if key and key not in self.required_artifacts: + self.required_artifacts.add(key) + + # We also update the usage times of any artifacts + # we will be using, which helps preventing a + # buildstream process that runs in parallel with + # this one from removing artifacts in-use. + try: + self.update_atime(element, key) + except FileNotFoundError: + pass + # clean(): # # Clean the artifact cache as much as possible. @@ -193,7 +227,9 @@ class ArtifactCache(): else: break - self.remove(to_remove) + key = to_remove.rpartition('/')[2] + if key not in self.required_artifacts: + self.remove(to_remove) # This should be O(1) if implemented correctly return self.calculate_cache_size() @@ -282,6 +318,18 @@ class ArtifactCache(): raise ImplError("Cache '{kind}' does not implement remove()" .format(kind=type(self).__name__)) + # update_atime(): + # + # Update the access time of an element. + # + # Args: + # element (Element): The Element to mark + # key (str): The cache key to use + # + def update_atime(self, element, key): + raise ImplError("Cache '{kind}' does not implement update_atime()" + .format(kind=type(self).__name__)) + # extract(): # # Extract cached artifact for the specified Element if it hasn't diff --git a/buildstream/_artifactcache/ostreecache.py b/buildstream/_artifactcache/ostreecache.py index e0bbc3c..bb8f4fc 100755 --- a/buildstream/_artifactcache/ostreecache.py +++ b/buildstream/_artifactcache/ostreecache.py @@ -103,6 +103,11 @@ class OSTreeCache(ArtifactCache): # correct number of artifacts. self.cache_size -= _ostree.remove(self.repo, artifact_name, defer_prune=False) + def update_atime(self, element, key): + ref = self.get_artifact_fullname(element, key) + ref_file = os.path.join(self.repo.get_path().get_path(), 'refs', 'heads', ref) + os.utime(ref_file) + def extract(self, element, key): ref = self.get_artifact_fullname(element, key) @@ -150,9 +155,7 @@ class OSTreeCache(ArtifactCache): except OSTreeError as e: raise ArtifactError("Failed to commit artifact: {}".format(e)) from e - for ref in refs: - ref_file = os.path.join(self.repo.get_path().get_path(), 'refs', 'heads', ref) - os.utime(ref_file) + self.append_required_artifacts([element]) self.cache_size = None @@ -189,6 +192,9 @@ class OSTreeCache(ArtifactCache): # fetch the artifact from highest priority remote using the specified cache key remote_name = self._ensure_remote(self.repo, remote.pull_url) _ostree.fetch(self.repo, remote=remote_name, ref=ref, progress=progress) + + self.append_required_artifacts([element]) + return True except OSTreeError: # Try next remote diff --git a/buildstream/_artifactcache/tarcache.py b/buildstream/_artifactcache/tarcache.py index ad41e42..4e9f5f9 100644 --- a/buildstream/_artifactcache/tarcache.py +++ b/buildstream/_artifactcache/tarcache.py @@ -86,7 +86,7 @@ class TarCache(ArtifactCache): _Tar.archive(os.path.join(self.tardir, ref), key, temp) self.cache_size = None - os.utime(os.path.join(self.tardir, ref)) + self.append_required_artifacts([element]) # update_atime(): # diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index 9f4504d..7f159c7 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -159,6 +159,8 @@ class Pipeline(): # Determine initial element state. element._update_state() + self._artifacts.append_required_artifacts((e for e in self.dependencies(targets, Scope.ALL))) + # dependencies() # # Generator function to iterate over elements and optionally diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index a7e8e32..9b2cbe6 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -72,6 +72,8 @@ class BuildQueue(Queue): # Inform element in main process that assembly is done element._assemble_done() + # This has to be done after _assemble_done, such that the + # element may register its cache key as required self._check_cache_size(job, element) return True diff --git a/buildstream/element.py b/buildstream/element.py index c5b62e4..518fb59 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -1398,6 +1398,16 @@ class Element(Plugin): workspace.clear_running_files() self._get_context().get_workspaces().save_config() + # We also need to update the required artifacts, since + # workspaced dependencies do not have a fixed cache key + # when the build starts. + # + # This does *not* cause a race condition, because + # _assemble_done is called before a cleanup job may be + # launched. + # + self.__artifacts.append_required_artifacts([self]) + # _assemble(): # # Internal method for running the entire build phase.
