This is an automated email from the ASF dual-hosted git repository. tvb pushed a commit to branch tristan/remove-plan-selection-mode in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 3d75a9623427f6fe7cf9ce8c0368546a9b796cd0 Author: Tristan van Berkom <[email protected]> AuthorDate: Tue Oct 5 15:27:00 2021 +0900 _scheduler/queues/queue.py: Prioritize enqueue order The queue processing order was long since modified to ignore the insertion order and use the depth assigned with Element._set_depth(). This patch adds the insertion order as the second criteria for sorting elements (after depth), rather than falling back on processing elements alphabetically. This changes the behavior of non-build sessions such that elements are processed in an order similar to the order in which they are listed in the pipeline summaries. This consequently also allows us to reuse some of the expected ordering in some test cases which were using `--deps plan`, which is being removed. --- src/buildstream/_scheduler/queues/queue.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 3859572..3c83a9a 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -75,6 +75,7 @@ class Queue: self._ready_queue = [] # Ready elements self._done_queue = deque() # Processed / Skipped elements self._max_retries = 0 + self._queued_elements = 0 # Number of elements queued self._required_element_check = False # Whether we should check that elements are required before enqueuing @@ -217,6 +218,10 @@ class Queue: # Spawn as many jobs from the ready queue for which resources # can be reserved. # + # Priority is first given to elements which have been assigned a lower + # depth (see Element._set_depth()), and then to elements which have + # been enqueued earlier. + # # Returns: # ([Job]): A list of jobs which can be run now # @@ -228,7 +233,7 @@ class Queue: if not reserved: break - _, element = heapq.heappop(self._ready_queue) + _, _, element = heapq.heappop(self._ready_queue) ready.append(element) return [ @@ -378,13 +383,15 @@ class Queue: # def _enqueue_element(self, element): status = self.status(element) + if status == QueueStatus.SKIP: # Place skipped elements into the done queue immediately self._task_group.add_skipped_task() self._done_queue.append(element) # Elements to proceed to the next queue elif status == QueueStatus.READY: # Push elements which are ready to be processed immediately into the queue - heapq.heappush(self._ready_queue, (element._depth, element)) + heapq.heappush(self._ready_queue, (element._depth, self._queued_elements, element)) + self._queued_elements += 1 else: # Register a queue specific callback for pending elements self.register_pending_element(element)
