This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch testing/local-cache-expiry
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 4fcd69510182f182d980a2e232660e0bf906d883
Author: Tristan Maat <[email protected]>
AuthorDate: Wed Jul 11 10:55:00 2018 +0100

    queue.py: Introduce Resources
---
 buildstream/_scheduler/queues/queue.py | 92 ++++++++++++++++------------------
 1 file changed, 42 insertions(+), 50 deletions(-)

diff --git a/buildstream/_scheduler/queues/queue.py 
b/buildstream/_scheduler/queues/queue.py
index 2ff10d8..ac20d37 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -26,26 +26,13 @@ import traceback
 
 # Local imports
 from ..jobs import ElementJob
+from ..resources import ResourceType
 
 # BuildStream toplevel imports
 from ..._exceptions import BstError, set_last_task_error
 from ..._message import Message, MessageType
 
 
-# Indicates the kind of activity
-#
-#
-class QueueType():
-    # Tasks which download stuff from the internet
-    FETCH = 1
-
-    # CPU/Disk intensive tasks
-    BUILD = 2
-
-    # Tasks which upload stuff to the internet
-    PUSH = 3
-
-
 # Queue status for a given element
 #
 #
@@ -70,15 +57,13 @@ class Queue():
     # These should be overridden on class data of of concrete Queue 
implementations
     action_name = None
     complete_name = None
-    queue_type = None
-    job_type = None
+    resources = []                     # Resources this queues' jobs want
 
     def __init__(self, scheduler):
 
         #
         # Public members
         #
-        self.active_jobs = []          # List of active ongoing Jobs, for 
scheduler observation
         self.failed_elements = []      # List of failed elements, for the 
frontend
         self.processed_elements = []   # List of processed elements, for the 
frontend
         self.skipped_elements = []     # List of skipped elements, for the 
frontend
@@ -90,13 +75,13 @@ class Queue():
         self._wait_queue = deque()
         self._done_queue = deque()
         self._max_retries = 0
-        if self.queue_type == QueueType.FETCH or self.queue_type == 
QueueType.PUSH:
-            self._max_retries = scheduler.context.sched_network_retries
 
         # Assert the subclass has setup class data
         assert self.action_name is not None
         assert self.complete_name is not None
-        assert self.queue_type is not None
+
+        if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in 
self.resources:
+            self._max_retries = scheduler.context.sched_network_retries
 
     #####################################################
     #     Abstract Methods for Queue implementations    #
@@ -173,10 +158,22 @@ class Queue():
         if not elts:
             return
 
+        # Note: The internal lists work with jobs. This is not
+        #       reflected in any external methods (except
+        #       pop/peek_ready_jobs).
+        def create_job(element):
+            logfile = self._element_log_path(element)
+            return ElementJob(self._scheduler, self.action_name,
+                              logfile, element=element, queue=self,
+                              resources=self.resources,
+                              action_cb=self.process,
+                              complete_cb=self._job_done,
+                              max_retries=self._max_retries)
+
         # Place skipped elements directly on the done queue
-        elts = list(elts)
-        skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
-        wait = [elt for elt in elts if elt not in skip]
+        jobs = [create_job(elt) for elt in elts]
+        skip = [job for job in jobs if self.status(job.element) == 
QueueStatus.SKIP]
+        wait = [job for job in jobs if job not in skip]
 
         self._wait_queue.extend(wait)
         self._done_queue.extend(skip)
@@ -192,7 +189,7 @@ class Queue():
     #
     def dequeue(self):
         while self._done_queue:
-            yield self._done_queue.popleft()
+            yield self._done_queue.popleft().element
 
     # dequeue_ready()
     #
@@ -204,7 +201,10 @@ class Queue():
     def dequeue_ready(self):
         return any(self._done_queue)
 
-    # process_ready()
+    # pop_ready_jobs()
+    #
+    # Returns:
+    #     ([Job]): A list of jobs to run
     #
     # Process elements in the queue, moving elements which were enqueued
     # into the dequeue pool, and processing them if necessary.
@@ -214,42 +214,31 @@ class Queue():
     #
     #   o Elements which are QueueStatus.WAIT will not be effected
     #
-    #   o Elements which are QueueStatus.READY will be processed
-    #     and added to the Queue.active_jobs list as a result,
-    #     given that the scheduler allows the Queue enough tokens
-    #     for the given queue's job type
-    #
     #   o Elements which are QueueStatus.SKIP will move directly
     #     to the dequeue pool
     #
-    def process_ready(self):
-        scheduler = self._scheduler
+    #   o For Elements which are QueueStatus.READY a Job will be
+    #     created and returned to the caller, given that the scheduler
+    #     allows the Queue enough resources for the given job
+    #
+    def pop_ready_jobs(self):
         unready = []
         ready = []
 
-        while self._wait_queue and scheduler.get_job_token(self.queue_type):
-            element = self._wait_queue.popleft()
+        while self._wait_queue:
+            job = self._wait_queue.popleft()
+            element = job.element
 
             status = self.status(element)
             if status == QueueStatus.WAIT:
-                scheduler.put_job_token(self.queue_type)
-                unready.append(element)
+                unready.append(job)
                 continue
             elif status == QueueStatus.SKIP:
-                scheduler.put_job_token(self.queue_type)
-                self._done_queue.append(element)
+                self._done_queue.append(job)
                 self.skipped_elements.append(element)
                 continue
 
-            logfile = self._element_log_path(element)
             self.prepare(element)
-
-            job = ElementJob(scheduler, self.job_type,
-                             self.action_name, logfile,
-                             element=element, queue=self,
-                             action_cb=self.process,
-                             complete_cb=self._job_done,
-                             max_retries=self._max_retries)
             ready.append(job)
 
         # These were not ready but were in the beginning, give em
@@ -258,6 +247,12 @@ class Queue():
 
         return ready
 
+    def peek_ready_jobs(self):
+        def ready(job):
+            return self.status(job.element) == QueueStatus.READY
+
+        yield from (job for job in self._wait_queue if ready(job))
+
     #####################################################
     #                 Private Methods                   #
     #####################################################
@@ -341,7 +336,7 @@ class Queue():
             # No exception occured, handle the success/failure state in the 
normal way
             #
             if success:
-                self._done_queue.append(element)
+                self._done_queue.append(job)
                 if processed:
                     self.processed_elements.append(element)
                 else:
@@ -349,9 +344,6 @@ class Queue():
             else:
                 self.failed_elements.append(element)
 
-        # Give the token for this job back to the scheduler
-        self._scheduler.put_job_token(self.queue_type)
-
     # Convenience wrapper for Queue implementations to send
     # a message for the element they are processing
     def _message(self, element, message_type, brief, **kwargs):

Reply via email to