This is an automated email from the ASF dual-hosted git repository. akitouni pushed a commit to branch abderrahim/simplify-jobs in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit c25fae052d726afa0056e90fa3eb50bf9189c95e Author: Abderrahim Kitouni <[email protected]> AuthorDate: Tue Jul 26 14:25:48 2022 +0200 job.py: merge ElementJob into Job ElementJob was the only subclass, since the other jobs are now taken care of by buildbox-casd --- src/buildstream/_scheduler/__init__.py | 2 +- src/buildstream/_scheduler/jobs/__init__.py | 3 +- src/buildstream/_scheduler/jobs/elementjob.py | 82 ------------------ src/buildstream/_scheduler/jobs/job.py | 118 ++++++++++---------------- src/buildstream/_scheduler/queues/queue.py | 5 +- 5 files changed, 47 insertions(+), 163 deletions(-) diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py index d014c1788..c6dbe3642 100644 --- a/src/buildstream/_scheduler/__init__.py +++ b/src/buildstream/_scheduler/__init__.py @@ -27,4 +27,4 @@ from .queues.pullqueue import PullQueue from .queues.cachequeryqueue import CacheQueryQueue from .scheduler import Scheduler, SchedStatus -from .jobs import ElementJob, JobStatus +from .jobs import Job, JobStatus diff --git a/src/buildstream/_scheduler/jobs/__init__.py b/src/buildstream/_scheduler/jobs/__init__.py index 3de09a475..68dedc8b0 100644 --- a/src/buildstream/_scheduler/jobs/__init__.py +++ b/src/buildstream/_scheduler/jobs/__init__.py @@ -16,5 +16,4 @@ # Authors: # Tristan Maat <[email protected]> -from .elementjob import ElementJob -from .job import JobStatus +from .job import Job, JobStatus diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py deleted file mode 100644 index 28bb65d80..000000000 --- a/src/buildstream/_scheduler/jobs/elementjob.py +++ /dev/null @@ -1,82 +0,0 @@ -# Copyright (C) 2018 Codethink Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Author: -# Tristan Daniƫl Maat <[email protected]> -# - -from .job import Job - - -# ElementJob() -# -# A job to run an element's commands. When this job is started -# `action_cb` will be called, and when it completes `complete_cb` will -# be called. -# -# Args: -# scheduler (Scheduler): The scheduler -# action_name (str): The queue action name -# max_retries (int): The maximum number of retries -# action_cb (callable): The function to execute on the child -# complete_cb (callable): The function to execute when the job completes -# element (Element): The element to work on -# kwargs: Remaining Job() constructor arguments -# -# Here is the calling signature of the action_cb: -# -# action_cb(): -# -# This function will be called in the child task -# -# Args: -# element (Element): The element passed to the Job() constructor -# -# Returns: -# (object): Any abstract simple python object, including a string, int, -# bool, list or dict, this must be a simple serializable object. -# -# Here is the calling signature of the complete_cb: -# -# complete_cb(): -# -# This function will be called when the child task completes -# -# Args: -# job (Job): The job object which completed -# element (Element): The element passed to the Job() constructor -# status (JobStatus): The status of whether the workload raised an exception -# result (object): The deserialized object returned by the `action_cb`, or None -# if `success` is False -# -class ElementJob(Job): - def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs): - super().__init__(*args, **kwargs) - self.set_name(element._get_full_name()) - self.queue = queue - self._element = element # Set the Element pertaining to the job - self._action_cb = action_cb # The action callable function - self._complete_cb = complete_cb # The complete callable function - - # Set the plugin element name & key for logging purposes - self.set_message_element_name(self.name) - self.set_message_element_key(self._element._get_display_key()) - - def parent_complete(self, status, result): - self._complete_cb(self, self._element, status, self._result) - - def child_process(self): - - # Run the action - return self._action_cb(self._element) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index c57f8d29f..90fa1765c 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -72,21 +72,52 @@ class JobStatus(FastEnum): # action_name (str): The queue action name # logfile (str): A template string that points to the logfile # that should be used - should contain {pid}. +# element (Element): The element to work on +# action_cb (callable): The function to execute on the child +# complete_cb (callable): The function to execute when the job completes # max_retries (int): The maximum number of retries # +# Here is the calling signature of the action_cb: +# +# action_cb(): +# +# This function will be called in the child task +# +# Args: +# element (Element): The element passed to the Job() constructor +# +# Returns: +# (object): Any abstract simple python object, including a string, int, +# bool, list or dict, this must be a simple serializable object. +# +# Here is the calling signature of the complete_cb: +# +# complete_cb(): +# +# This function will be called when the child task completes +# +# Args: +# job (Job): The job object which completed +# element (Element): The element passed to the Job() constructor +# status (JobStatus): The status of whether the workload raised an exception +# result (object): The deserialized object returned by the `action_cb`, or None +# if `success` is False +# + + class Job: # Unique id generator for jobs # # This is used to identify tasks in the `State` class _id_generator = itertools.count(1) - def __init__(self, scheduler, action_name, logfile, *, max_retries=0): + def __init__(self, scheduler, action_name, logfile, element, action_cb, complete_cb, max_retries=0): # # Public members # self.id = "{}-{}".format(action_name, next(Job._id_generator)) - self.name = None # The name of the job, set by the job's subclass + self.name = element._get_full_name() self.action_name = action_name # The action name for the Queue # @@ -101,9 +132,11 @@ class Job: self._terminated = False # Whether this job has been explicitly terminated self._logfile = logfile - self._message_element_name = None # The task-wide element name - self._message_element_key = None # The task-wide element cache key - self._element = None # The Element() passed to the Job() constructor, if applicable + self._element_name = element._get_full_name() # The task-wide element name + self._element_key = element._get_display_key() # The task-wide element cache key + self._element = element # Set the Element pertaining to the job + self._action_cb = action_cb # The action callable function + self._complete_cb = complete_cb # The complete callable function self._task = None # The task that is run @@ -111,12 +144,6 @@ class Job: self._should_terminate = False self._terminate_lock = threading.Lock() - # set_name() - # - # Sets the name of this job - def set_name(self, name): - self.name = name - # start() # # Starts the job. @@ -168,28 +195,6 @@ class Job: def get_terminated(self): return self._terminated - # set_message_element_name() - # - # This is called by Job subclasses to set the plugin instance element - # name issuing the message (if an element is related to the Job). - # - # Args: - # element_name (int): The element_name to be supplied to the Message() constructor - # - def set_message_element_name(self, element_name): - self._message_element_name = element_name - - # set_message_element_key() - # - # This is called by Job subclasses to set the element - # key for for the issuing message (if an element is related to the Job). - # - # Args: - # element_key (_DisplayKey): The element_key tuple to be supplied to the Message() constructor - # - def set_message_element_key(self, element_key): - self._message_element_key = element_key - # message(): # # Logs a message, this will be logged in the task's logfile and @@ -204,18 +209,13 @@ class Job: def message(self, message_type, message, **kwargs): kwargs["scheduler"] = True message = Message( - message_type, - message, - element_name=self._message_element_name, - element_key=self._message_element_key, - **kwargs + message_type, message, element_name=self._element_name, element_key=self._element_key, **kwargs ) self._messenger.message(message) # get_element() # - # Get the Element() related to the job, if jobtype (i.e ElementJob) is - # applicable, default None. + # Get the Element() related to the job # # Returns: # (Element): The Element() instance pertaining to the Job, else None. @@ -223,36 +223,6 @@ class Job: def get_element(self): return self._element - ####################################################### - # Abstract Methods # - ####################################################### - - # child_process() - # - # This will be executed after starting the child process, and is intended - # to perform the job's task. - # - # Returns: - # (any): A simple object (must be pickle-able, i.e. strings, lists, - # dicts, numbers, but not Element instances). It is returned to - # the parent Job running in the main process. This is taken as - # the result of the Job. - # - def child_process(self): - raise ImplError("Job '{kind}' does not implement child_process()".format(kind=type(self).__name__)) - - # parent_complete() - # - # This will be executed in the main process after the job finishes, and is - # expected to pass the result to the main thread. - # - # Args: - # status (JobStatus): The job exit status - # result (any): The result returned by child_process(). - # - def parent_complete(self, status, result): - raise ImplError("Job '{kind}' does not implement parent_complete()".format(kind=type(self).__name__)) - ####################################################### # Local Private Methods # ####################################################### @@ -301,7 +271,7 @@ class Job: else: status = JobStatus.FAIL - self.parent_complete(status, self._result) + self._complete_cb(self, self._element, status, self._result) self._scheduler.job_completed(self, status) self._task = None @@ -312,9 +282,7 @@ class Job: def child_action(self): # Set the global message handler in this child # process to forward messages to the parent process - self._messenger.setup_new_action_context( - self.action_name, self._message_element_name, self._message_element_key - ) + self._messenger.setup_new_action_context(self.action_name, self._element_name, self._element_key) # Time, log and and run the action function # @@ -331,7 +299,7 @@ class Job: try: # Try the task action - result = self.child_process() # pylint: disable=assignment-from-no-return + result = self._action_cb(self._element) except SkipJob as e: elapsed = datetime.datetime.now() - timeinfo.start_time self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename) diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 76688e75d..110f14944 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -25,7 +25,7 @@ import traceback from typing import TYPE_CHECKING # Local imports -from ..jobs import ElementJob, JobStatus +from ..jobs import Job, JobStatus from ..resources import ResourceType # BuildStream toplevel imports @@ -232,12 +232,11 @@ class Queue: ready.append(element) return [ - ElementJob( + Job( self._scheduler, self.action_name, self._element_log_path(element), element=element, - queue=self, action_cb=self.get_process_func(), complete_cb=self._job_done, max_retries=self._max_retries,
