This is an automated email from the ASF dual-hosted git repository. root pushed a commit to branch testing/local-cache-expiry in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 4fcd69510182f182d980a2e232660e0bf906d883 Author: Tristan Maat <[email protected]> AuthorDate: Wed Jul 11 10:55:00 2018 +0100 queue.py: Introduce Resources --- buildstream/_scheduler/queues/queue.py | 92 ++++++++++++++++------------------ 1 file changed, 42 insertions(+), 50 deletions(-) diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 2ff10d8..ac20d37 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -26,26 +26,13 @@ import traceback # Local imports from ..jobs import ElementJob +from ..resources import ResourceType # BuildStream toplevel imports from ..._exceptions import BstError, set_last_task_error from ..._message import Message, MessageType -# Indicates the kind of activity -# -# -class QueueType(): - # Tasks which download stuff from the internet - FETCH = 1 - - # CPU/Disk intensive tasks - BUILD = 2 - - # Tasks which upload stuff to the internet - PUSH = 3 - - # Queue status for a given element # # @@ -70,15 +57,13 @@ class Queue(): # These should be overridden on class data of of concrete Queue implementations action_name = None complete_name = None - queue_type = None - job_type = None + resources = [] # Resources this queues' jobs want def __init__(self, scheduler): # # Public members # - self.active_jobs = [] # List of active ongoing Jobs, for scheduler observation self.failed_elements = [] # List of failed elements, for the frontend self.processed_elements = [] # List of processed elements, for the frontend self.skipped_elements = [] # List of skipped elements, for the frontend @@ -90,13 +75,13 @@ class Queue(): self._wait_queue = deque() self._done_queue = deque() self._max_retries = 0 - if self.queue_type == QueueType.FETCH or self.queue_type == QueueType.PUSH: - self._max_retries = scheduler.context.sched_network_retries # Assert the subclass has setup class data assert self.action_name is not None assert self.complete_name is not None - assert self.queue_type is not None + + if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources: + self._max_retries = scheduler.context.sched_network_retries ##################################################### # Abstract Methods for Queue implementations # @@ -173,10 +158,22 @@ class Queue(): if not elts: return + # Note: The internal lists work with jobs. This is not + # reflected in any external methods (except + # pop/peek_ready_jobs). + def create_job(element): + logfile = self._element_log_path(element) + return ElementJob(self._scheduler, self.action_name, + logfile, element=element, queue=self, + resources=self.resources, + action_cb=self.process, + complete_cb=self._job_done, + max_retries=self._max_retries) + # Place skipped elements directly on the done queue - elts = list(elts) - skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP] - wait = [elt for elt in elts if elt not in skip] + jobs = [create_job(elt) for elt in elts] + skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP] + wait = [job for job in jobs if job not in skip] self._wait_queue.extend(wait) self._done_queue.extend(skip) @@ -192,7 +189,7 @@ class Queue(): # def dequeue(self): while self._done_queue: - yield self._done_queue.popleft() + yield self._done_queue.popleft().element # dequeue_ready() # @@ -204,7 +201,10 @@ class Queue(): def dequeue_ready(self): return any(self._done_queue) - # process_ready() + # pop_ready_jobs() + # + # Returns: + # ([Job]): A list of jobs to run # # Process elements in the queue, moving elements which were enqueued # into the dequeue pool, and processing them if necessary. @@ -214,42 +214,31 @@ class Queue(): # # o Elements which are QueueStatus.WAIT will not be effected # - # o Elements which are QueueStatus.READY will be processed - # and added to the Queue.active_jobs list as a result, - # given that the scheduler allows the Queue enough tokens - # for the given queue's job type - # # o Elements which are QueueStatus.SKIP will move directly # to the dequeue pool # - def process_ready(self): - scheduler = self._scheduler + # o For Elements which are QueueStatus.READY a Job will be + # created and returned to the caller, given that the scheduler + # allows the Queue enough resources for the given job + # + def pop_ready_jobs(self): unready = [] ready = [] - while self._wait_queue and scheduler.get_job_token(self.queue_type): - element = self._wait_queue.popleft() + while self._wait_queue: + job = self._wait_queue.popleft() + element = job.element status = self.status(element) if status == QueueStatus.WAIT: - scheduler.put_job_token(self.queue_type) - unready.append(element) + unready.append(job) continue elif status == QueueStatus.SKIP: - scheduler.put_job_token(self.queue_type) - self._done_queue.append(element) + self._done_queue.append(job) self.skipped_elements.append(element) continue - logfile = self._element_log_path(element) self.prepare(element) - - job = ElementJob(scheduler, self.job_type, - self.action_name, logfile, - element=element, queue=self, - action_cb=self.process, - complete_cb=self._job_done, - max_retries=self._max_retries) ready.append(job) # These were not ready but were in the beginning, give em @@ -258,6 +247,12 @@ class Queue(): return ready + def peek_ready_jobs(self): + def ready(job): + return self.status(job.element) == QueueStatus.READY + + yield from (job for job in self._wait_queue if ready(job)) + ##################################################### # Private Methods # ##################################################### @@ -341,7 +336,7 @@ class Queue(): # No exception occured, handle the success/failure state in the normal way # if success: - self._done_queue.append(element) + self._done_queue.append(job) if processed: self.processed_elements.append(element) else: @@ -349,9 +344,6 @@ class Queue(): else: self.failed_elements.append(element) - # Give the token for this job back to the scheduler - self._scheduler.put_job_token(self.queue_type) - # Convenience wrapper for Queue implementations to send # a message for the element they are processing def _message(self, element, message_type, brief, **kwargs):
