This is an automated email from the ASF dual-hosted git repository. github-bot pushed a commit to branch aevri/picklable_jobs in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit fe0540800de89ab8d0de20c563a8134c2eae32b6 Author: Angelos Evripiotis <[email protected]> AuthorDate: Tue Apr 9 13:31:39 2019 +0100 WIP: perf: don't bind entire queue to callback --- src/buildstream/_scheduler/queues/artifactpushqueue.py | 11 +++++++---- src/buildstream/_scheduler/queues/buildqueue.py | 10 +++++++--- src/buildstream/_scheduler/queues/fetchqueue.py | 15 +++++++++++++-- src/buildstream/_scheduler/queues/pullqueue.py | 11 +++++++---- src/buildstream/_scheduler/queues/queue.py | 18 +++--------------- src/buildstream/_scheduler/queues/sourcepushqueue.py | 11 +++++++---- src/buildstream/_scheduler/queues/trackqueue.py | 8 ++++++-- 7 files changed, 50 insertions(+), 34 deletions(-) diff --git a/src/buildstream/_scheduler/queues/artifactpushqueue.py b/src/buildstream/_scheduler/queues/artifactpushqueue.py index 5e9a7e8..dc6300e 100644 --- a/src/buildstream/_scheduler/queues/artifactpushqueue.py +++ b/src/buildstream/_scheduler/queues/artifactpushqueue.py @@ -38,13 +38,16 @@ class ArtifactPushQueue(Queue): del state['_scheduler'] return state - def process(self, element): - # returns whether an artifact was uploaded or not - if not element._push(): - raise SkipJob(self.action_name) + def get_process_func(self): + return _raise_skip_if_not_pushed def status(self, element): if element._skip_push(): return QueueStatus.SKIP return QueueStatus.READY + + +def _raise_skip_if_not_pushed(element): + if not element._push(): + raise SkipJob(ArtifactPushQueue.action_name) diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py index 34394fa..5336416 100644 --- a/src/buildstream/_scheduler/queues/buildqueue.py +++ b/src/buildstream/_scheduler/queues/buildqueue.py @@ -63,7 +63,7 @@ class BuildQueue(Queue): logfile=logfile) job = ElementJob(self._scheduler, self.action_name, logfile, element=element, queue=self, - action_cb=self.process, + action_cb=self.get_process_func(), complete_cb=self._job_done, max_retries=self._max_retries) self._done_queue.append(element) @@ -72,8 +72,8 @@ class BuildQueue(Queue): return super().enqueue(to_queue) - def process(self, element): - return element._assemble() + def get_process_func(self): + return _assemble_element def status(self, element): if not element._is_required(): @@ -122,3 +122,7 @@ class BuildQueue(Queue): # if status == JobStatus.OK: # self._check_cache_size(job, element, result) + + +def _assemble_element(element): + return element._assemble() diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py index 50ad2b8..7ee962f 100644 --- a/src/buildstream/_scheduler/queues/fetchqueue.py +++ b/src/buildstream/_scheduler/queues/fetchqueue.py @@ -47,8 +47,11 @@ class FetchQueue(Queue): self._skip_cached = skip_cached self._fetch_original = fetch_original - def process(self, element): - element._fetch(fetch_original=self._fetch_original) + def get_process_func(self): + if self._fetch_original: + return _fetch_original + else: + return _fetch_no_original def status(self, element): if not element._is_required(): @@ -84,3 +87,11 @@ class FetchQueue(Queue): assert element._get_consistency() == Consistency.CACHED else: assert element._source_cached() + + +def _fetch_no_original(element): + element._fetch(fetch_original=False) + + +def _fetch_original(element): + element._fetch(fetch_original=True) diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py index 013ee64..c54c4f5 100644 --- a/src/buildstream/_scheduler/queues/pullqueue.py +++ b/src/buildstream/_scheduler/queues/pullqueue.py @@ -33,10 +33,8 @@ class PullQueue(Queue): complete_name = "Pulled" resources = [ResourceType.DOWNLOAD, ResourceType.CACHE] - def process(self, element): - # returns whether an artifact was downloaded or not - if not element._pull(): - raise SkipJob(self.action_name) + def get_process_func(self): + return _raise_skip_if_not_pulled def status(self, element): if not element._is_required(): @@ -64,3 +62,8 @@ class PullQueue(Queue): # actually check the cache size. if status == JobStatus.OK: self._scheduler.check_cache_size() + + +def _raise_skip_if_not_pulled(element): + if not element._pull(): + raise SkipJob(PullQueue.action_name) diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 1efcffc..cb2ea82 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -88,20 +88,8 @@ class Queue(): # Abstract Methods for Queue implementations # ##################################################### - # process() - # - # Abstract method for processing an element - # - # Args: - # element (Element): An element to process - # - # Returns: - # (any): An optional something to be returned - # for every element successfully processed - # - # - def process(self, element): - pass + def get_process_func(self): + raise NotImplementedError() # status() # @@ -215,7 +203,7 @@ class Queue(): ElementJob(self._scheduler, self.action_name, self._element_log_path(element), element=element, queue=self, - action_cb=self.process, + action_cb=self.get_process_func(), complete_cb=self._job_done, max_retries=self._max_retries) for element in ready diff --git a/src/buildstream/_scheduler/queues/sourcepushqueue.py b/src/buildstream/_scheduler/queues/sourcepushqueue.py index c38460e..92587d6 100644 --- a/src/buildstream/_scheduler/queues/sourcepushqueue.py +++ b/src/buildstream/_scheduler/queues/sourcepushqueue.py @@ -30,13 +30,16 @@ class SourcePushQueue(Queue): complete_name = "Sources pushed" resources = [ResourceType.UPLOAD] - def process(self, element): - # Returns whether a source was pushed or not - if not element._source_push(): - raise SkipJob(self.action_name) + def get_process_func(self): + return _raise_skip_if_not_pushed def status(self, element): if element._skip_source_push(): return QueueStatus.SKIP return QueueStatus.READY + + +def _raise_skip_if_not_pushed(element): + if not element._source_push(): + raise SkipJob(SourcePushQueue.action_name) diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py index 72a79a5..4048072 100644 --- a/src/buildstream/_scheduler/queues/trackqueue.py +++ b/src/buildstream/_scheduler/queues/trackqueue.py @@ -35,8 +35,8 @@ class TrackQueue(Queue): complete_name = "Tracked" resources = [ResourceType.DOWNLOAD] - def process(self, element): - return element._track() + def get_process_func(self, element): + return _track_element def status(self, element): # We can skip elements entirely if they have no sources. @@ -60,3 +60,7 @@ class TrackQueue(Queue): source._set_ref(new_ref, save=True) element._tracking_done() + + +def _track_element(element): + return element._track()
