This is an automated email from the ASF dual-hosted git repository. root pushed a commit to branch testing/local-cache-expiry in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 66b25acd197b4739a2db3b27d12a423b18348601 Author: Tristan Maat <[email protected]> AuthorDate: Tue Mar 20 09:24:01 2018 +0000 Calculate the artifact cache size --- buildstream/_artifactcache/artifactcache.py | 70 ++++++++++++++++++++++ buildstream/_artifactcache/ostreecache.py | 12 ++++ buildstream/_artifactcache/tarcache.py | 20 ++++++- buildstream/_scheduler/jobs/__init__.py | 1 + buildstream/_scheduler/jobs/cachesizejob.py | 91 +++++++++++++++++++++++++++++ buildstream/_scheduler/jobs/elementjob.py | 6 ++ buildstream/_scheduler/jobs/job.py | 1 + buildstream/_scheduler/queues/buildqueue.py | 19 +++++- buildstream/_scheduler/queues/fetchqueue.py | 2 +- buildstream/_scheduler/queues/pullqueue.py | 7 ++- buildstream/_scheduler/queues/pushqueue.py | 2 +- buildstream/_scheduler/queues/queue.py | 21 ++++--- buildstream/_scheduler/queues/trackqueue.py | 2 +- buildstream/element.py | 23 +++++++- 14 files changed, 260 insertions(+), 17 deletions(-) diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index e9611bf..3b2aaf5 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -62,6 +62,9 @@ class ArtifactCache(): def __init__(self, context): self.context = context self.extractdir = os.path.join(context.artifactdir, 'extract') + self.max_size = context.cache_quota + self.estimated_size = None + self.global_remote_specs = [] self.project_remote_specs = {} @@ -162,6 +165,35 @@ class ArtifactCache(): (str(provenance))) return cache_specs + # get_approximate_cache_size() + # + # A cheap method that aims to serve as an upper limit on the + # artifact cache size. + # + # The cache size reported by this function will normally be larger + # than the real cache size, since it is calculated using the + # pre-commit artifact size, but for very small artifacts in + # certain caches additional overhead could cause this to be + # smaller than, but close to, the actual size. + # + # Nonetheless, in practice this should be safe to use as an upper + # limit on the cache size. + # + # If the cache has built-in constant-time size reporting, please + # feel free to override this method with a more accurate + # implementation. + # + # Returns: + # (int) An approximation of the artifact cache size. + # + def get_approximate_cache_size(self): + # If we don't currently have an estimate, figure out the real + # cache size. + if self.estimated_size is None: + self.estimated_size = self.calculate_cache_size() + + return self.estimated_size + ################################################ # Abstract methods for subclasses to implement # ################################################ @@ -334,6 +366,20 @@ class ArtifactCache(): raise ImplError("Cache '{kind}' does not implement link_key()" .format(kind=type(self).__name__)) + # calculate_cache_size() + # + # Return the real artifact cache size. + # + # Implementations should also use this to update estimated_size. + # + # Returns: + # + # (int) The size of the artifact cache. + # + def calculate_cache_size(self): + raise ImplError("Cache '{kind}' does not implement calculate_cache_size()" + .format(kind=type(self).__name__)) + ################################################ # Local Private Methods # ################################################ @@ -375,6 +421,30 @@ class ArtifactCache(): with self.context.timed_activity("Initializing remote caches", silent_nested=True): self.initialize_remotes(on_failure=remote_failed) + # _add_artifact_size() + # + # Since we cannot keep track of the cache size between threads, + # this method will be called by the main process every time a + # process that added something to the cache finishes. + # + # This will then add the reported size to + # ArtifactCache.estimated_size. + # + def _add_artifact_size(self, artifact_size): + if not self.estimated_size: + self.estimated_size = self.calculate_cache_size() + + self.estimated_size += artifact_size + + # _set_cache_size() + # + # Similarly to the above method, when we calculate the actual size + # in a child thread, we can't update it. We instead pass the value + # back to the main thread and update it there. + # + def _set_cache_size(self, cache_size): + self.estimated_size = cache_size + # _configured_remote_artifact_cache_specs(): # diff --git a/buildstream/_artifactcache/ostreecache.py b/buildstream/_artifactcache/ostreecache.py old mode 100644 new mode 100755 index f71bee0..8d22917 --- a/buildstream/_artifactcache/ostreecache.py +++ b/buildstream/_artifactcache/ostreecache.py @@ -59,6 +59,9 @@ class OSTreeCache(ArtifactCache): self._has_fetch_remotes = False self._has_push_remotes = False + # A cached artifact cache size (irony?) + self.cache_size = None + ################################################ # Implementation of abstract methods # ################################################ @@ -137,6 +140,8 @@ class OSTreeCache(ArtifactCache): except OSTreeError as e: raise ArtifactError("Failed to commit artifact: {}".format(e)) from e + self.cache_size = None + def can_diff(self): return True @@ -204,6 +209,13 @@ class OSTreeCache(ArtifactCache): return any_pushed + def calculate_cache_size(self): + if self.cache_size is None: + self.cache_size = utils._get_dir_size(self.repo.get_path().get_path()) + self.estimated_size = self.cache_size + + return self.cache_size + def initialize_remotes(self, *, on_failure=None): remote_specs = self.global_remote_specs.copy() diff --git a/buildstream/_artifactcache/tarcache.py b/buildstream/_artifactcache/tarcache.py index d995929..3cae9d9 100644 --- a/buildstream/_artifactcache/tarcache.py +++ b/buildstream/_artifactcache/tarcache.py @@ -36,6 +36,7 @@ class TarCache(ArtifactCache): self.tardir = os.path.join(context.artifactdir, 'tar') os.makedirs(self.tardir, exist_ok=True) + self.cache_size = None ################################################ # Implementation of abstract methods # @@ -52,7 +53,7 @@ class TarCache(ArtifactCache): artifact = os.path.join(self.tardir, artifact_name + '.tar.bz2') size = os.stat(artifact, follow_symlinks=False).st_size os.remove(artifact) - return size + self.cache_size -= size def commit(self, element, content, keys): os.makedirs(os.path.join(self.tardir, element._get_project().name, element.normal_name), exist_ok=True) @@ -66,6 +67,8 @@ class TarCache(ArtifactCache): _Tar.archive(os.path.join(self.tardir, ref), key, temp) + self.cache_size = None + def extract(self, element, key): fullname = self.get_artifact_fullname(element, key) @@ -100,6 +103,21 @@ class TarCache(ArtifactCache): return dest + # get_cache_size() + # + # Return the artifact cache size. + # + # Returns: + # + # (int) The size of the artifact cache. + # + def calculate_cache_size(self): + if self.cache_size is None: + self.cache_size = utils._get_dir_size(self.tardir) + self.estimated_size = self.cache_size + + return self.cache_size + # _tarpath() # diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py index 0030f5c..9815586 100644 --- a/buildstream/_scheduler/jobs/__init__.py +++ b/buildstream/_scheduler/jobs/__init__.py @@ -1 +1,2 @@ from .elementjob import ElementJob +from .cachesizejob import CacheSizeJob diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py new file mode 100644 index 0000000..897e896 --- /dev/null +++ b/buildstream/_scheduler/jobs/cachesizejob.py @@ -0,0 +1,91 @@ +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Author: +# Tristan Daniƫl Maat <[email protected]> +# +import os +from contextlib import contextmanager + +from .job import Job +from ..._platform import Platform +from ..._message import Message, MessageType + + +class CacheSizeJob(Job): + def __init__(self, *args, complete_cb, **kwargs): + super().__init__(*args, **kwargs) + self._complete_cb = complete_cb + self._cache = Platform._instance.artifactcache + + def _child_process(self): + return self._cache.calculate_cache_size() + + def _parent_complete(self, success, result): + self._cache._set_cache_size(result) + if self._complete_cb: + self._complete_cb(result) + + @contextmanager + def _child_logging_enabled(self, logfile): + self._logfile = logfile.format(pid=os.getpid()) + yield self._logfile + self._logfile = None + + # _message(): + # + # Sends a message to the frontend + # + # Args: + # message_type (MessageType): The type of message to send + # message (str): The message + # kwargs: Remaining Message() constructor arguments + # + def _message(self, message_type, message, **kwargs): + args = dict(kwargs) + args['scheduler'] = True + self._scheduler.context.message(Message(None, message_type, message, **args)) + + def _child_log(self, message): + with open(self._logfile, 'a+') as log: + INDENT = " " + EMPTYTIME = "--:--:--" + + template = "[{timecode: <8}] {type: <7} {name: <15}: {message}" + detail = '' + if message.detail is not None: + template += "\n\n{detail}" + detail = message.detail.rstrip('\n') + detail = INDENT + INDENT.join(detail.splitlines(True)) + + timecode = EMPTYTIME + if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): + hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2) + minutes, seconds = divmod(remainder, 60) + timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) + + message_text = template.format(timecode=timecode, + type=message.message_type.upper(), + name='cache_size', + message=message.message, + detail=detail) + + log.write('{}\n'.format(message_text)) + log.flush() + + return message + + def _child_process_data(self): + return {} diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py index 36e7c1d..45c16eb 100644 --- a/buildstream/_scheduler/jobs/elementjob.py +++ b/buildstream/_scheduler/jobs/elementjob.py @@ -210,7 +210,13 @@ class ElementJob(Job): data = {} workspace = self._element._get_workspace() + artifact_size = self._element._get_artifact_size() + cache_size = self._element._get_artifact_cache().cache_size + if workspace is not None: data['workspace'] = workspace.to_dict() + if artifact_size is not None: + data['artifact_size'] = artifact_size + data['cache_size'] = cache_size return data diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index cf5bf07..fb25901 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -58,6 +58,7 @@ class JobType(): BUILD = 3 PULL = 4 PUSH = 5 + SIZE = 6 # Job() diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index 0c75538..c023973 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -20,7 +20,7 @@ import os from . import Queue, QueueStatus, QueueType -from ..jobs import JobType +from ..jobs import CacheSizeJob, JobType # A queue which assembles elements @@ -53,10 +53,25 @@ class BuildQueue(Queue): return QueueStatus.READY - def done(self, element, result, success): + def _check_cache_size(self, job, element): + if not job.child_data: + return + + artifact_size = job.child_data.get('artifact_size', False) + + if artifact_size: + cache = element._get_artifact_cache() + cache._add_artifact_size(artifact_size) + + if cache.get_approximate_cache_size() > self._scheduler.context.cache_quota: + self._scheduler._check_cache_size_real() + + def done(self, job, element, result, success): if success: # Inform element in main process that assembly is done element._assemble_done() + self._check_cache_size(job, element) + return True diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index 2438a9a..a4ec8ad 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -68,7 +68,7 @@ class FetchQueue(Queue): return QueueStatus.READY - def done(self, element, result, success): + def done(self, _, element, result, success): if not success: return False diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py index 1fc4364..20e7843 100644 --- a/buildstream/_scheduler/queues/pullqueue.py +++ b/buildstream/_scheduler/queues/pullqueue.py @@ -53,13 +53,18 @@ class PullQueue(Queue): else: return QueueStatus.SKIP - def done(self, element, result, success): + def done(self, _, element, result, success): if not success: return False element._pull_done() + # Build jobs will check the "approximate" size first. Since we + # do not get an artifact size from pull jobs, we have to + # actually check the cache size. + self._scheduler._check_cache_size_real() + # Element._pull() returns True if it downloaded an artifact, # here we want to appear skipped if we did not download. return result diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py index aa5540e..77f2e02 100644 --- a/buildstream/_scheduler/queues/pushqueue.py +++ b/buildstream/_scheduler/queues/pushqueue.py @@ -42,7 +42,7 @@ class PushQueue(Queue): return QueueStatus.READY - def done(self, element, result, success): + def done(self, _, element, result, success): if not success: return False diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 39e431b..2ff10d8 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -145,6 +145,7 @@ class Queue(): # Abstract method for handling a successful job completion. # # Args: + # job (Job): The job which completed processing # element (Element): The element which completed processing # result (any): The return value of the process() implementation # success (bool): True if the process() implementation did not @@ -154,7 +155,7 @@ class Queue(): # (bool): True if the element should appear to be processsed, # Otherwise False will count the element as "skipped" # - def done(self, element, result, success): + def done(self, job, element, result, success): pass ##################################################### @@ -224,6 +225,7 @@ class Queue(): def process_ready(self): scheduler = self._scheduler unready = [] + ready = [] while self._wait_queue and scheduler.get_job_token(self.queue_type): element = self._wait_queue.popleft() @@ -248,12 +250,14 @@ class Queue(): action_cb=self.process, complete_cb=self._job_done, max_retries=self._max_retries) - scheduler.job_starting(job, element) + ready.append(job) # These were not ready but were in the beginning, give em # first priority again next time around self._wait_queue.extendleft(unready) + return ready + ##################################################### # Private Methods # ##################################################### @@ -270,7 +274,7 @@ class Queue(): def _update_workspaces(self, element, job): workspace_dict = None if job.child_data: - workspace_dict = job.child_data['workspace'] + workspace_dict = job.child_data.get('workspace', None) # Handle any workspace modifications now # @@ -298,17 +302,17 @@ class Queue(): # def _job_done(self, job, element, success, result): - # Remove from our jobs - self.active_jobs.remove(job) - - # Update workspaces in the main task before calling any queue implementation + # Update values that need to be synchronized in the main task + # before calling any queue implementation self._update_workspaces(element, job) + if job.child_data: + element._get_artifact_cache().cache_size = job.child_data.get('cache_size') # Give the result of the job to the Queue implementor, # and determine if it should be considered as processed # or skipped. try: - processed = self.done(element, result, success) + processed = self.done(job, element, result, success) except BstError as e: @@ -346,7 +350,6 @@ class Queue(): self.failed_elements.append(element) # Give the token for this job back to the scheduler - # immediately before invoking another round of scheduling self._scheduler.put_job_token(self.queue_type) # Convenience wrapper for Queue implementations to send diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py index a371e52..df3f7b1 100644 --- a/buildstream/_scheduler/queues/trackqueue.py +++ b/buildstream/_scheduler/queues/trackqueue.py @@ -49,7 +49,7 @@ class TrackQueue(Queue): return QueueStatus.READY - def done(self, element, result, success): + def done(self, _, element, result, success): if not success: return False diff --git a/buildstream/element.py b/buildstream/element.py index fc21f80..98a39a4 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -225,6 +225,7 @@ class Element(Plugin): self.__staged_sources_directory = None # Location where Element.stage_sources() was called self.__tainted = None # Whether the artifact is tainted and should not be shared self.__required = False # Whether the artifact is required in the current session + self.__artifact_size = None # The size of data committed to the artifact cache # hash tables of loaded artifact metadata, hashed by key self.__metadata_keys = {} # Strong and weak keys for this key @@ -1524,7 +1525,8 @@ class Element(Plugin): }), os.path.join(metadir, 'workspaced-dependencies.yaml')) with self.timed_activity("Caching artifact"): - self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit()) + self.__artifacts._commit(self, assembledir, self.__get_cache_keys_for_commit()) + self.__artifact_size = utils._get_dir_size(assembledir) # Finally cleanup the build dir cleanup_rootdir() @@ -1763,6 +1765,25 @@ class Element(Plugin): workspaces = self._get_context().get_workspaces() return workspaces.get_workspace(self._get_full_name()) + # _get_artifact_size() + # + # Get the size of the artifact produced by this element in the + # current pipeline - if this element has not been assembled or + # pulled, this will be None. + # + # Note that this is the size of an artifact *before* committing it + # to the cache, the size on disk may differ. It can act as an + # approximate guide for when to do a proper size calculation. + # + # Returns: + # (int|None): The size of the artifact + # + def _get_artifact_size(self): + return self.__artifact_size + + def _get_artifact_cache(self): + return self.__artifacts + # _write_script(): # # Writes a script to the given directory.
