This is an automated email from the ASF dual-hosted git repository. akitouni pushed a commit to branch abderrahim/simplify-jobs in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit a1690517044a1f27e0e0d0eb6fb0b78750d129f4 Author: Abderrahim Kitouni <[email protected]> AuthorDate: Mon Jul 25 15:00:29 2022 +0200 job.py: merge ChildJob into Job We no longer need the parent/child separation --- src/buildstream/_scheduler/jobs/elementjob.py | 12 +- src/buildstream/_scheduler/jobs/job.py | 186 +++++--------------------- 2 files changed, 31 insertions(+), 167 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py index 3d04beba9..28bb65d80 100644 --- a/src/buildstream/_scheduler/jobs/elementjob.py +++ b/src/buildstream/_scheduler/jobs/elementjob.py @@ -16,7 +16,7 @@ # Tristan Daniƫl Maat <[email protected]> # -from .job import Job, ChildJob +from .job import Job # ElementJob() @@ -76,16 +76,6 @@ class ElementJob(Job): def parent_complete(self, status, result): self._complete_cb(self, self._element, status, self._result) - def create_child_job(self, *args, **kwargs): - return ChildElementJob(*args, element=self._element, action_cb=self._action_cb, **kwargs) - - -class ChildElementJob(ChildJob): - def __init__(self, *args, element, action_cb, **kwargs): - super().__init__(*args, **kwargs) - self._element = element - self._action_cb = action_cb - def child_process(self): # Run the action diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 08bdbcd9d..c57f8d29f 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -67,23 +67,6 @@ class JobStatus(FastEnum): # process. It has some methods that are not implemented - they are meant for # you to implement in a subclass. # -# It has a close relationship with the ChildJob class, and it can be considered -# a two part solution: -# -# 1. A Job instance, which will create a ChildJob instance and arrange for -# childjob.child_process() to be executed in another process. -# 2. The created ChildJob instance, which does the actual work. -# -# This split makes it clear what data is passed to the other process and what -# is executed in which process. -# -# To set up a minimal new kind of Job, e.g. YourJob: -# -# 1. Create a YourJob class, inheriting from Job. -# 2. Create a YourChildJob class, inheriting from ChildJob. -# 3. Implement YourJob.create_child_job() and YourJob.parent_complete(). -# 4. Implement YourChildJob.child_process(). -# # Args: # scheduler (Scheduler): The scheduler # action_name (str): The queue action name @@ -123,7 +106,10 @@ class Job: self._element = None # The Element() passed to the Job() constructor, if applicable self._task = None # The task that is run - self._child = None + + self._thread_id = None # Thread in which the child executes its action + self._should_terminate = False + self._terminate_lock = threading.Lock() # set_name() # @@ -141,22 +127,10 @@ class Job: self._tries += 1 - # FIXME: remove the parent/child separation, it's not needed anymore. - self._child = self.create_child_job( # pylint: disable=assignment-from-no-return - self.action_name, - self._messenger, - self._scheduler.context.logdir, - self._logfile, - self._max_retries, - self._tries, - self._message_element_name, - self._message_element_key, - ) - loop = asyncio.get_event_loop() async def execute(): - ret_code, self._result = await loop.run_in_executor(None, self._child.child_action) + ret_code, self._result = await loop.run_in_executor(None, self.child_action) await self._parent_child_completed(ret_code) self._task = loop.create_task(execute()) @@ -171,8 +145,17 @@ class Job: self.message(MessageType.STATUS, "{} terminating".format(self.action_name)) if self._task: - self._child.terminate() + assert utils._is_in_main_thread(), "Terminating the job's thread should only be done from the scheduler" + + if self._should_terminate: + return + + with self._terminate_lock: + self._should_terminate = True + if self._thread_id is None: + return + terminate_thread(self._thread_id) self._terminated = True # get_terminated() @@ -244,6 +227,20 @@ class Job: # Abstract Methods # ####################################################### + # child_process() + # + # This will be executed after starting the child process, and is intended + # to perform the job's task. + # + # Returns: + # (any): A simple object (must be pickle-able, i.e. strings, lists, + # dicts, numbers, but not Element instances). It is returned to + # the parent Job running in the main process. This is taken as + # the result of the Job. + # + def child_process(self): + raise ImplError("Job '{kind}' does not implement child_process()".format(kind=type(self).__name__)) + # parent_complete() # # This will be executed in the main process after the job finishes, and is @@ -256,24 +253,6 @@ class Job: def parent_complete(self, status, result): raise ImplError("Job '{kind}' does not implement parent_complete()".format(kind=type(self).__name__)) - # create_child_job() - # - # Called by a Job instance to create a child job. - # - # The child job object is an instance of a subclass of ChildJob. - # - # The child job object's child_process() method will be executed in another - # process, so that work is done in parallel. See the documentation for the - # Job class for more information on this relationship. - # - # This method must be overridden by Job subclasses. - # - # Returns: - # (ChildJob): An instance of a subclass of ChildJob. - # - def create_child_job(self, *args, **kwargs): - raise ImplError("Job '{kind}' does not implement create_child_job()".format(kind=type(self).__name__)) - ####################################################### # Local Private Methods # ####################################################### @@ -326,92 +305,6 @@ class Job: self._scheduler.job_completed(self, status) self._task = None - -# ChildJob() -# -# The ChildJob object represents the part of a parallel task that will run in a -# separate process. It has a close relationship with the parent Job that -# created it. -# -# See the documentation of the Job class for more on their relationship, and -# how to set up a (Job, ChildJob pair). -# -# The args below are passed from the parent Job to the ChildJob. -# -# Args: -# scheduler (Scheduler): The scheduler. -# action_name (str): The queue action name. -# logfile (str): A template string that points to the logfile -# that should be used - should contain {pid}. -# max_retries (int): The maximum number of retries. -# tries (int): The number of retries so far. -# message_element_name (str): None, or the plugin instance element name -# to be supplied to the Message() constructor. -# message_element_key (tuple): None, or the element display key tuple -# to be supplied to the Message() constructor. -# -class ChildJob: - def __init__( - self, action_name, messenger, logdir, logfile, max_retries, tries, message_element_name, message_element_key - ): - - self.action_name = action_name - - self._messenger = messenger - self._logdir = logdir - self._logfile = logfile - self._max_retries = max_retries - self._tries = tries - self._message_element_name = message_element_name - self._message_element_key = message_element_key - - self._thread_id = None # Thread in which the child executes its action - self._should_terminate = False - self._terminate_lock = threading.Lock() - - # message(): - # - # Logs a message, this will be logged in the task's logfile and - # conditionally also be sent to the frontend. - # - # Args: - # message_type (MessageType): The type of message to send - # message (str): The message - # kwargs: Remaining Message() constructor arguments, note - # element_key is set in _child_message_handler - # for front end display if not already set or explicitly - # overriden here. - # - def message(self, message_type, message, **kwargs): - kwargs["scheduler"] = True - self._messenger.message( - Message( - message_type, - message, - element_name=self._message_element_name, - element_key=self._message_element_key, - **kwargs - ) - ) - - ####################################################### - # Abstract Methods # - ####################################################### - - # child_process() - # - # This will be executed after starting the child process, and is intended - # to perform the job's task. - # - # Returns: - # (any): A simple object (must be pickle-able, i.e. strings, lists, - # dicts, numbers, but not Element instances). It is returned to - # the parent Job running in the main process. This is taken as - # the result of the Job. - # - def child_process(self): - raise ImplError("ChildJob '{kind}' does not implement child_process()".format(kind=type(self).__name__)) - # child_action() # # Perform the action in the child process, this calls the action_cb. @@ -426,7 +319,7 @@ class ChildJob: # Time, log and and run the action function # with self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages( - self._logfile, self._logdir + self._logfile, self._scheduler.context.logdir ) as filename: try: self.message(MessageType.START, self.action_name, logfile=filename) @@ -499,22 +392,3 @@ class ChildJob: except TerminateException: self._thread_id = None return _ReturnCode.TERMINATED, None - - # terminate() - # - # Ask the the current child thread to terminate - # - # This should only ever be called from the main thread. - # - def terminate(self): - assert utils._is_in_main_thread(), "Terminating the job's thread should only be done from the scheduler" - - if self._should_terminate: - return - - with self._terminate_lock: - self._should_terminate = True - if self._thread_id is None: - return - - terminate_thread(self._thread_id)
