This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch testing/local-cache-expiry
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 034a5a4929242046458caf4c2a21ecd73969ccaf
Author: Tristan Maat <[email protected]>
AuthorDate: Wed Jul 11 10:53:07 2018 +0100

    scheduler.py: Introduce Resources
---
 buildstream/_scheduler/scheduler.py | 146 +++++++++++++++++-------------------
 1 file changed, 70 insertions(+), 76 deletions(-)

diff --git a/buildstream/_scheduler/scheduler.py 
b/buildstream/_scheduler/scheduler.py
index 7e4b985..ee226cf 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -27,6 +27,8 @@ import datetime
 from contextlib import contextmanager
 
 # Local imports
+from .resources import Resources, ResourceType
+from .jobs import CacheSizeJob, CleanupJob
 
 
 # A decent return code for Scheduler.run()
@@ -69,8 +71,8 @@ class Scheduler():
         #
         # Public members
         #
-        self.waiting_jobs = []      # Jobs waiting for execution
         self.active_jobs = []       # Jobs currently being run in the scheduler
+        self.waiting_jobs = []      # Jobs waiting for resources
         self.queues = None          # Exposed for the frontend to print 
summaries
         self.context = context      # The Context object shared with Queues
         self.terminated = False     # Whether the scheduler was asked to 
terminate or has terminated
@@ -92,6 +94,9 @@ class Scheduler():
         self._suspendtime = None
         self._queue_jobs = True      # Whether we should continue to queue jobs
 
+        self._resources = Resources(context.sched_builders,
+                                    context.sched_fetchers,
+                                    context.sched_pushers)
 
     # run()
     #
@@ -124,7 +129,7 @@ class Scheduler():
         self._connect_signals()
 
         # Run the queues
-        self.schedule_queue_jobs()
+        self._schedule_queue_jobs()
         self.loop.run_forever()
         self.loop.close()
 
@@ -204,19 +209,50 @@ class Scheduler():
             starttime = timenow
         return timenow - starttime
 
-    # sched()
+    # schedule_jobs()
     #
-    # The main driving function of the scheduler, it will be called
-    # automatically when Scheduler.run() is called initially, and needs
-    # to be called whenever a job can potentially be scheduled, usually
-    # when a Queue completes handling of a job.
+    # Args:
+    #     jobs ([Job]): A list of jobs to schedule
     #
-    # This will process the Queues and pull elements through the Queues
-    # and process anything that is ready.
+    # Schedule 'Job's for the scheduler to run. Jobs scheduled will be
+    # run as soon any other queueing jobs finish, provided sufficient
+    # resources are available for them to run
+    #
+    def schedule_jobs(self, jobs):
+        for job in jobs:
+            self.waiting_jobs.append(job)
+
+    # job_completed():
+    #
+    # Called when a Job completes
+    #
+    # Args:
+    #    queue (Queue): The Queue holding a complete job
+    #    job (Job): The completed Job
+    #    success (bool): Whether the Job completed with a success status
+    #
+    def job_completed(self, job, success):
+        self._resources.clear_job_resources(job)
+        self.active_jobs.remove(job)
+        self._job_complete_callback(job, success)
+        self._schedule_queue_jobs()
+        self._sched()
+
+    #######################################################
+    #                  Local Private Methods              #
+    #######################################################
+
+    # _sched()
     #
-    def sched(self):
+    # The main driving function of the scheduler, it will be called
+    # automatically when Scheduler.run() is called initially,
+    #
+    def _sched(self):
         for job in self.waiting_jobs:
+            self._resources.reserve_exclusive_resources(job)
 
+        for job in self.waiting_jobs:
+            if not self._resources.reserve_job_resources(job):
                 continue
 
             job.spawn()
@@ -230,10 +266,16 @@ class Scheduler():
         if not self.active_jobs and not self.waiting_jobs:
             self.loop.stop()
 
-    def schedule_jobs(self, jobs):
-        self.waiting_jobs.extend(jobs)
-
-    def schedule_queue_jobs(self):
+    # _schedule_queue_jobs()
+    #
+    # Ask the queues what jobs they want to schedule and schedule
+    # them. This is done here so we can ask for new jobs when jobs
+    # from previous queues become available.
+    #
+    # This will process the Queues, pull elements through the Queues
+    # and process anything that is ready.
+    #
+    def _schedule_queue_jobs(self):
         ready = []
         process_queues = True
 
@@ -259,68 +301,23 @@ class Scheduler():
             # thus need all the pulls to complete before ever starting
             # a build
             ready.extend(chain.from_iterable(
-                queue.process_ready() for queue in reversed(self.queues)
+                queue.pop_ready_jobs() for queue in reversed(self.queues)
             ))
 
-            # process_ready() may have skipped jobs, adding them to
+            # pop_ready_jobs() may have skipped jobs, adding them to
             # the done_queue.  Pull these skipped elements forward to
             # the next queue and process them.
             process_queues = any(q.dequeue_ready() for q in self.queues)
 
         self.schedule_jobs(ready)
-        self.sched()
+        self._sched()
 
-    # job_completed():
-    #
-    # Called when a Job completes
-    #
-    # Args:
-    #    queue (Queue): The Queue holding a complete job
-    #    job (Job): The completed Job
-    #    success (bool): Whether the Job completed with a success status
-    #
-    def job_completed(self, job, success):
-        self.active_jobs.remove(job)
-        self.schedule_queue_jobs()
 
-        # Notify frontend
-        if self._job_complete_callback:
-            self._job_complete_callback(job, success)
 
-    # get_job_token():
-    #
-    # Used by the Queue object to obtain a token for
-    # processing a Job, if a Queue does not receive a token
-    # then it must wait until a later time in order to
-    # process pending jobs.
-    #
-    # Args:
-    #    queue_type (QueueType): The type of token to obtain
-    #
-    # Returns:
-    #    (bool): Whether a token was handed out or not
-    #
-    def get_job_token(self, queue_type):
-        if self._job_tokens[queue_type] > 0:
-            self._job_tokens[queue_type] -= 1
-            return True
-        return False
+        self.schedule_jobs([job])
 
-    # put_job_token():
-    #
-    # Return a job token to the scheduler. Tokens previously
-    # received with get_job_token() must be returned to
-    # the scheduler once the associated job is complete.
-    #
-    # Args:
-    #    queue_type (QueueType): The type of token to obtain
-    #
-    def put_job_token(self, queue_type):
-        self._job_tokens[queue_type] += 1
 
-    #######################################################
-    #                  Local Private Methods              #
-    #######################################################
+        self.schedule_jobs([job])
 
     def _run_cleanup(self, cache_size):
         if cache_size and cache_size < self.context.cache_quota:
@@ -349,9 +346,8 @@ class Scheduler():
         if not self.suspended:
             self._suspendtime = datetime.datetime.now()
             self.suspended = True
-            for queue in self.queues:
-                for job in queue.active_jobs:
-                    job.suspend()
+            for job in self.active_jobs:
+                job.suspend()
 
     # _resume_jobs()
     #
@@ -359,9 +355,8 @@ class Scheduler():
     #
     def _resume_jobs(self):
         if self.suspended:
-            for queue in self.queues:
-                for job in queue.active_jobs:
-                    job.resume()
+            for job in self.active_jobs:
+                job.resume()
             self.suspended = False
             self._starttime += (datetime.datetime.now() - self._suspendtime)
             self._suspendtime = None
@@ -423,21 +418,20 @@ class Scheduler():
         wait_start = datetime.datetime.now()
         wait_limit = 20.0
 
-        active_jobs = self.active_jobs
-        for queue in self.queues:
-            active_jobs.extend(queue.active_jobs)
-
         # First tell all jobs to terminate
-        for job in active_jobs:
+        for job in self.active_jobs:
             job.terminate()
 
         # Now wait for them to really terminate
-        for job in active_jobs:
+        for job in self.active_jobs:
             elapsed = datetime.datetime.now() - wait_start
             timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
             if not job.terminate_wait(timeout):
                 job.kill()
 
+        # Clear out the waiting jobs
+        self.waiting_jobs = []
+
     # Regular timeout for driving status in the UI
     def _tick(self):
         elapsed = self.elapsed_time()

Reply via email to