This is an automated email from the ASF dual-hosted git repository. root pushed a commit to branch testing/local-cache-expiry in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit bc4cc296380f165da8eeb6d335ef6cb5b1f4f106 Author: Tristan Maat <[email protected]> AuthorDate: Sun Jun 24 22:14:49 2018 +0000 buildstream/_scheduler/*.py: Make job submission a queue job --- buildstream/_scheduler/jobs/job.py | 15 ++- buildstream/_scheduler/queues/buildqueue.py | 3 + buildstream/_scheduler/queues/fetchqueue.py | 2 + buildstream/_scheduler/queues/pullqueue.py | 2 + buildstream/_scheduler/queues/pushqueue.py | 2 + buildstream/_scheduler/queues/queue.py | 14 +-- buildstream/_scheduler/queues/trackqueue.py | 2 + buildstream/_scheduler/scheduler.py | 179 ++++++++++++++++++++-------- 8 files changed, 154 insertions(+), 65 deletions(-) diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index c567b6f..84be452 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -52,6 +52,14 @@ class Process(multiprocessing.Process): self._sentinel = self._popen.sentinel +class JobType(): + FETCH = 1 + TRACK = 2 + BUILD = 3 + PULL = 4 + PUSH = 5 + + # Job() # # The Job object represents a parallel task, when calling Job.spawn(), @@ -61,6 +69,7 @@ class Process(multiprocessing.Process): # # Args: # scheduler (Scheduler): The scheduler +# job_type (QueueType): The type of the job # action_name (str): The queue action name # logfile (str): A template string that points to the logfile # that should be used - should contain {pid}. @@ -68,13 +77,14 @@ class Process(multiprocessing.Process): # class Job(): - def __init__(self, scheduler, action_name, logfile, *, max_retries=0): + def __init__(self, scheduler, job_type, action_name, logfile, *, max_retries=0): # # Public members # self.action_name = action_name # The action name for the Queue - self.child_data = None + self.child_data = None # Data to be sent to the main process + self.job_type = job_type # The type of the job # # Private members @@ -541,6 +551,7 @@ class Job(): return self._parent_complete(returncode == 0, self._result) + self._scheduler.job_completed(self) # _parent_process_envelope() # diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index 50ba312..0c75538 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -18,7 +18,9 @@ # Tristan Van Berkom <[email protected]> # Jürg Billeter <[email protected]> +import os from . import Queue, QueueStatus, QueueType +from ..jobs import JobType # A queue which assembles elements @@ -28,6 +30,7 @@ class BuildQueue(Queue): action_name = "Build" complete_name = "Built" queue_type = QueueType.BUILD + job_type = JobType.BUILD def process(self, element): element._assemble() diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index bdff156..2438a9a 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -23,6 +23,7 @@ from ... import Consistency # Local imports from . import Queue, QueueStatus, QueueType +from ..jobs import JobType # A queue which fetches element sources @@ -32,6 +33,7 @@ class FetchQueue(Queue): action_name = "Fetch" complete_name = "Fetched" queue_type = QueueType.FETCH + job_type = JobType.FETCH def __init__(self, scheduler, skip_cached=False): super().__init__(scheduler) diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py index b4f5b0d..1fc4364 100644 --- a/buildstream/_scheduler/queues/pullqueue.py +++ b/buildstream/_scheduler/queues/pullqueue.py @@ -20,6 +20,7 @@ # Local imports from . import Queue, QueueStatus, QueueType +from ..jobs import JobType # A queue which pulls element artifacts @@ -29,6 +30,7 @@ class PullQueue(Queue): action_name = "Pull" complete_name = "Pulled" queue_type = QueueType.FETCH + job_type = JobType.PULL def process(self, element): # returns whether an artifact was downloaded or not diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py index 624eefd..aa5540e 100644 --- a/buildstream/_scheduler/queues/pushqueue.py +++ b/buildstream/_scheduler/queues/pushqueue.py @@ -20,6 +20,7 @@ # Local imports from . import Queue, QueueStatus, QueueType +from ..jobs import JobType # A queue which pushes element artifacts @@ -29,6 +30,7 @@ class PushQueue(Queue): action_name = "Push" complete_name = "Pushed" queue_type = QueueType.PUSH + job_type = JobType.PUSH def process(self, element): # returns whether an artifact was uploaded or not diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 7f115b4..39e431b 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -71,6 +71,7 @@ class Queue(): action_name = None complete_name = None queue_type = None + job_type = None def __init__(self, scheduler): @@ -241,16 +242,14 @@ class Queue(): logfile = self._element_log_path(element) self.prepare(element) - job = ElementJob(scheduler, self.action_name, - logfile, element=element, + job = ElementJob(scheduler, self.job_type, + self.action_name, logfile, + element=element, queue=self, action_cb=self.process, complete_cb=self._job_done, max_retries=self._max_retries) scheduler.job_starting(job, element) - job.spawn() - self.active_jobs.append(job) - # These were not ready but were in the beginning, give em # first priority again next time around self._wait_queue.extendleft(unready) @@ -350,11 +349,6 @@ class Queue(): # immediately before invoking another round of scheduling self._scheduler.put_job_token(self.queue_type) - # Notify frontend - self._scheduler.job_completed(self, job, element, success) - - self._scheduler.sched() - # Convenience wrapper for Queue implementations to send # a message for the element they are processing def _message(self, element, message_type, brief, **kwargs): diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py index 3a65f01..a371e52 100644 --- a/buildstream/_scheduler/queues/trackqueue.py +++ b/buildstream/_scheduler/queues/trackqueue.py @@ -24,6 +24,7 @@ from ... import SourceError # Local imports from . import Queue, QueueStatus, QueueType +from ..jobs import JobType # A queue which tracks sources @@ -33,6 +34,7 @@ class TrackQueue(Queue): action_name = "Track" complete_name = "Tracked" queue_type = QueueType.FETCH + job_type = JobType.TRACK def process(self, element): return element._track() diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index a7a3f95..ffbd656 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -21,11 +21,13 @@ # System imports import os import asyncio +from itertools import chain import signal import datetime from contextlib import contextmanager # Local imports +from .jobs import JobType from .queues import QueueType @@ -36,6 +38,59 @@ class SchedStatus(): TERMINATED = 1 +# A set of rules that dictates in which order jobs should run. +# +# The first tuple defines jobs that are not allowed to be executed +# before the current job completes (even if the job is still waiting +# to be executed). +# +# The second tuple defines jobs that the current job is not allowed to +# be run in parallel with. +# +# Note that this is very different from the element job +# dependencies. Both a build and fetch job can be ready at the same +# time, this has nothing to do with the requirement to fetch sources +# before building. These rules are purely in place to maintain cache +# consistency. +# +JOB_RULES = { + JobType.CLEAN: { + # Build and pull jobs are not allowed to run when we are about + # to start a cleanup job, because they will add more data to + # the artifact cache. + 'priority': (JobType.BUILD, JobType.PULL), + # Cleanup jobs are not allowed to run in parallel with any + # jobs that might need to access the artifact cache, because + # we cannot guarantee atomicity otherwise. + 'exclusive': (JobType.BUILD, JobType.PULL, JobType.PUSH) + }, + JobType.BUILD: { + 'priority': (), + 'exclusive': () + }, + JobType.FETCH: { + 'priority': (), + 'exclusive': () + }, + JobType.PULL: { + 'priority': (), + 'exclusive': () + }, + JobType.PUSH: { + 'priority': (), + 'exclusive': () + }, + JobType.SIZE: { + 'priority': (), + 'exclusive': () + }, + JobType.TRACK: { + 'priority': (), + 'exclusive': () + } +} + + # Scheduler() # # The scheduler operates on a list queues, each of which is meant to accomplish @@ -69,6 +124,8 @@ class Scheduler(): # # Public members # + self.waiting_jobs = [] # Jobs waiting for execution + self.active_jobs = [] # Jobs currently being run in the scheduler self.queues = None # Exposed for the frontend to print summaries self.context = context # The Context object shared with Queues self.terminated = False # Whether the scheduler was asked to terminate or has terminated @@ -129,7 +186,7 @@ class Scheduler(): self._connect_signals() # Run the queues - self.sched() + self.schedule_queue_jobs() self.loop.run_forever() self.loop.close() @@ -220,11 +277,38 @@ class Scheduler(): # and process anything that is ready. # def sched(self): + for job in self.waiting_jobs: + # If our job is not allowed to run with any job currently + # running, we don't start it. + if any(running_job.job_type in JOB_RULES[job.job_type]['exclusive'] + for running_job in self.active_jobs): + continue + + # If any job currently waiting has priority over this one, + # we don't start it. + if any(job.job_type in JOB_RULES[waiting_job.job_type]['priority'] + for waiting_job in self.waiting_jobs): + continue + + job.spawn() + self.waiting_jobs.remove(job) + self.active_jobs.append(job) + + if self._job_start_callback: + self._job_start_callback(job) + + # If nothings ticking, time to bail out + if not self.active_jobs and not self.waiting_jobs: + self.loop.stop() + + def schedule_jobs(self, jobs): + self.waiting_jobs.extend(jobs) + def schedule_queue_jobs(self): + ready = [] process_queues = True while self._queue_jobs and process_queues: - # Pull elements forward through queues elements = [] for queue in self.queues: @@ -233,31 +317,42 @@ class Scheduler(): # Dequeue processed elements for the next queue elements = list(queue.dequeue()) - elements = list(elements) # Kickoff whatever processes can be processed at this time # - # We start by queuing from the last queue first, because we want to - # give priority to queues later in the scheduling process in the case - # that multiple queues share the same token type. + # We start by queuing from the last queue first, because + # we want to give priority to queues later in the + # scheduling process in the case that multiple queues + # share the same token type. # - # This avoids starvation situations where we dont move on to fetch - # tasks for elements which failed to pull, and thus need all the pulls - # to complete before ever starting a build - for queue in reversed(self.queues): - queue.process_ready() - - # process_ready() may have skipped jobs, adding them to the done_queue. - # Pull these skipped elements forward to the next queue and process them. + # This avoids starvation situations where we dont move on + # to fetch tasks for elements which failed to pull, and + # thus need all the pulls to complete before ever starting + # a build + ready.extend(chain.from_iterable( + queue.process_ready() for queue in reversed(self.queues) + )) + + # process_ready() may have skipped jobs, adding them to + # the done_queue. Pull these skipped elements forward to + # the next queue and process them. process_queues = any(q.dequeue_ready() for q in self.queues) - # If nothings ticking, time to bail out - ticking = 0 - for queue in self.queues: - ticking += len(queue.active_jobs) + self.schedule_jobs(ready) + self.sched() - if ticking == 0: - self.loop.stop() + # job_completed(): + # + # Called when a Job completes + # + # Args: + # queue (Queue): The Queue holding a complete job + # job (Job): The completed Job + # success (bool): Whether the Job completed with a success status + # + def job_completed(self, job): + self.active_jobs.remove(job) + self.schedule_queue_jobs() # get_job_token(): # @@ -290,30 +385,6 @@ class Scheduler(): def put_job_token(self, queue_type): self._job_tokens[queue_type] += 1 - # job_starting(): - # - # Called by the Queue when starting a Job - # - # Args: - # job (Job): The starting Job - # - def job_starting(self, job, element): - if self._job_start_callback: - self._job_start_callback(element, job.action_name) - - # job_completed(): - # - # Called by the Queue when a Job completes - # - # Args: - # queue (Queue): The Queue holding a complete job - # job (Job): The completed Job - # success (bool): Whether the Job completed with a success status - # - def job_completed(self, queue, job, element, success): - if self._job_complete_callback: - self._job_complete_callback(element, queue, job.action_name, success) - ####################################################### # Local Private Methods # ####################################################### @@ -400,18 +471,20 @@ class Scheduler(): wait_start = datetime.datetime.now() wait_limit = 20.0 - # First tell all jobs to terminate + active_jobs = self.active_jobs for queue in self.queues: - for job in queue.active_jobs: - job.terminate() + active_jobs.extend(queue.active_jobs) + + # First tell all jobs to terminate + for job in active_jobs: + job.terminate() # Now wait for them to really terminate - for queue in self.queues: - for job in queue.active_jobs: - elapsed = datetime.datetime.now() - wait_start - timeout = max(wait_limit - elapsed.total_seconds(), 0.0) - if not job.terminate_wait(timeout): - job.kill() + for job in active_jobs: + elapsed = datetime.datetime.now() - wait_start + timeout = max(wait_limit - elapsed.total_seconds(), 0.0) + if not job.terminate_wait(timeout): + job.kill() self.loop.stop()
