This is an automated email from the ASF dual-hosted git repository.

akitouni pushed a commit to branch abderrahim/simplify-jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit a1690517044a1f27e0e0d0eb6fb0b78750d129f4
Author: Abderrahim Kitouni <[email protected]>
AuthorDate: Mon Jul 25 15:00:29 2022 +0200

    job.py: merge ChildJob into Job
    
    We no longer need the parent/child separation
---
 src/buildstream/_scheduler/jobs/elementjob.py |  12 +-
 src/buildstream/_scheduler/jobs/job.py        | 186 +++++---------------------
 2 files changed, 31 insertions(+), 167 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/elementjob.py 
b/src/buildstream/_scheduler/jobs/elementjob.py
index 3d04beba9..28bb65d80 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -16,7 +16,7 @@
 #        Tristan DaniĆ«l Maat <[email protected]>
 #
 
-from .job import Job, ChildJob
+from .job import Job
 
 
 # ElementJob()
@@ -76,16 +76,6 @@ class ElementJob(Job):
     def parent_complete(self, status, result):
         self._complete_cb(self, self._element, status, self._result)
 
-    def create_child_job(self, *args, **kwargs):
-        return ChildElementJob(*args, element=self._element, 
action_cb=self._action_cb, **kwargs)
-
-
-class ChildElementJob(ChildJob):
-    def __init__(self, *args, element, action_cb, **kwargs):
-        super().__init__(*args, **kwargs)
-        self._element = element
-        self._action_cb = action_cb
-
     def child_process(self):
 
         # Run the action
diff --git a/src/buildstream/_scheduler/jobs/job.py 
b/src/buildstream/_scheduler/jobs/job.py
index 08bdbcd9d..c57f8d29f 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -67,23 +67,6 @@ class JobStatus(FastEnum):
 # process. It has some methods that are not implemented - they are meant for
 # you to implement in a subclass.
 #
-# It has a close relationship with the ChildJob class, and it can be considered
-# a two part solution:
-#
-# 1. A Job instance, which will create a ChildJob instance and arrange for
-#    childjob.child_process() to be executed in another process.
-# 2. The created ChildJob instance, which does the actual work.
-#
-# This split makes it clear what data is passed to the other process and what
-# is executed in which process.
-#
-# To set up a minimal new kind of Job, e.g. YourJob:
-#
-# 1. Create a YourJob class, inheriting from Job.
-# 2. Create a YourChildJob class, inheriting from ChildJob.
-# 3. Implement YourJob.create_child_job() and YourJob.parent_complete().
-# 4. Implement YourChildJob.child_process().
-#
 # Args:
 #    scheduler (Scheduler): The scheduler
 #    action_name (str): The queue action name
@@ -123,7 +106,10 @@ class Job:
         self._element = None  # The Element() passed to the Job() constructor, 
if applicable
 
         self._task = None  # The task that is run
-        self._child = None
+
+        self._thread_id = None  # Thread in which the child executes its action
+        self._should_terminate = False
+        self._terminate_lock = threading.Lock()
 
     # set_name()
     #
@@ -141,22 +127,10 @@ class Job:
 
         self._tries += 1
 
-        # FIXME: remove the parent/child separation, it's not needed anymore.
-        self._child = self.create_child_job(  # pylint: 
disable=assignment-from-no-return
-            self.action_name,
-            self._messenger,
-            self._scheduler.context.logdir,
-            self._logfile,
-            self._max_retries,
-            self._tries,
-            self._message_element_name,
-            self._message_element_key,
-        )
-
         loop = asyncio.get_event_loop()
 
         async def execute():
-            ret_code, self._result = await loop.run_in_executor(None, 
self._child.child_action)
+            ret_code, self._result = await loop.run_in_executor(None, 
self.child_action)
             await self._parent_child_completed(ret_code)
 
         self._task = loop.create_task(execute())
@@ -171,8 +145,17 @@ class Job:
         self.message(MessageType.STATUS, "{} 
terminating".format(self.action_name))
 
         if self._task:
-            self._child.terminate()
+            assert utils._is_in_main_thread(), "Terminating the job's thread 
should only be done from the scheduler"
+
+            if self._should_terminate:
+                return
+
+            with self._terminate_lock:
+                self._should_terminate = True
+                if self._thread_id is None:
+                    return
 
+            terminate_thread(self._thread_id)
         self._terminated = True
 
     # get_terminated()
@@ -244,6 +227,20 @@ class Job:
     #                  Abstract Methods                   #
     #######################################################
 
+    # child_process()
+    #
+    # This will be executed after starting the child process, and is intended
+    # to perform the job's task.
+    #
+    # Returns:
+    #    (any): A simple object (must be pickle-able, i.e. strings, lists,
+    #           dicts, numbers, but not Element instances). It is returned to
+    #           the parent Job running in the main process. This is taken as
+    #           the result of the Job.
+    #
+    def child_process(self):
+        raise ImplError("Job '{kind}' does not implement 
child_process()".format(kind=type(self).__name__))
+
     # parent_complete()
     #
     # This will be executed in the main process after the job finishes, and is
@@ -256,24 +253,6 @@ class Job:
     def parent_complete(self, status, result):
         raise ImplError("Job '{kind}' does not implement 
parent_complete()".format(kind=type(self).__name__))
 
-    # create_child_job()
-    #
-    # Called by a Job instance to create a child job.
-    #
-    # The child job object is an instance of a subclass of ChildJob.
-    #
-    # The child job object's child_process() method will be executed in another
-    # process, so that work is done in parallel. See the documentation for the
-    # Job class for more information on this relationship.
-    #
-    # This method must be overridden by Job subclasses.
-    #
-    # Returns:
-    #    (ChildJob): An instance of a subclass of ChildJob.
-    #
-    def create_child_job(self, *args, **kwargs):
-        raise ImplError("Job '{kind}' does not implement 
create_child_job()".format(kind=type(self).__name__))
-
     #######################################################
     #                  Local Private Methods              #
     #######################################################
@@ -326,92 +305,6 @@ class Job:
         self._scheduler.job_completed(self, status)
         self._task = None
 
-
-# ChildJob()
-#
-# The ChildJob object represents the part of a parallel task that will run in a
-# separate process. It has a close relationship with the parent Job that
-# created it.
-#
-# See the documentation of the Job class for more on their relationship, and
-# how to set up a (Job, ChildJob pair).
-#
-# The args below are passed from the parent Job to the ChildJob.
-#
-# Args:
-#    scheduler (Scheduler): The scheduler.
-#    action_name (str): The queue action name.
-#    logfile (str): A template string that points to the logfile
-#                   that should be used - should contain {pid}.
-#    max_retries (int): The maximum number of retries.
-#    tries (int): The number of retries so far.
-#    message_element_name (str): None, or the plugin instance element name
-#                                to be supplied to the Message() constructor.
-#    message_element_key (tuple): None, or the element display key tuple
-#                                to be supplied to the Message() constructor.
-#
-class ChildJob:
-    def __init__(
-        self, action_name, messenger, logdir, logfile, max_retries, tries, 
message_element_name, message_element_key
-    ):
-
-        self.action_name = action_name
-
-        self._messenger = messenger
-        self._logdir = logdir
-        self._logfile = logfile
-        self._max_retries = max_retries
-        self._tries = tries
-        self._message_element_name = message_element_name
-        self._message_element_key = message_element_key
-
-        self._thread_id = None  # Thread in which the child executes its action
-        self._should_terminate = False
-        self._terminate_lock = threading.Lock()
-
-    # message():
-    #
-    # Logs a message, this will be logged in the task's logfile and
-    # conditionally also be sent to the frontend.
-    #
-    # Args:
-    #    message_type (MessageType): The type of message to send
-    #    message (str): The message
-    #    kwargs: Remaining Message() constructor arguments, note
-    #            element_key is set in _child_message_handler
-    #            for front end display if not already set or explicitly
-    #            overriden here.
-    #
-    def message(self, message_type, message, **kwargs):
-        kwargs["scheduler"] = True
-        self._messenger.message(
-            Message(
-                message_type,
-                message,
-                element_name=self._message_element_name,
-                element_key=self._message_element_key,
-                **kwargs
-            )
-        )
-
-    #######################################################
-    #                  Abstract Methods                   #
-    #######################################################
-
-    # child_process()
-    #
-    # This will be executed after starting the child process, and is intended
-    # to perform the job's task.
-    #
-    # Returns:
-    #    (any): A simple object (must be pickle-able, i.e. strings, lists,
-    #           dicts, numbers, but not Element instances). It is returned to
-    #           the parent Job running in the main process. This is taken as
-    #           the result of the Job.
-    #
-    def child_process(self):
-        raise ImplError("ChildJob '{kind}' does not implement 
child_process()".format(kind=type(self).__name__))
-
     # child_action()
     #
     # Perform the action in the child process, this calls the action_cb.
@@ -426,7 +319,7 @@ class ChildJob:
         # Time, log and and run the action function
         #
         with self._messenger.timed_suspendable() as timeinfo, 
self._messenger.recorded_messages(
-            self._logfile, self._logdir
+            self._logfile, self._scheduler.context.logdir
         ) as filename:
             try:
                 self.message(MessageType.START, self.action_name, 
logfile=filename)
@@ -499,22 +392,3 @@ class ChildJob:
             except TerminateException:
                 self._thread_id = None
                 return _ReturnCode.TERMINATED, None
-
-    # terminate()
-    #
-    # Ask the the current child thread to terminate
-    #
-    # This should only ever be called from the main thread.
-    #
-    def terminate(self):
-        assert utils._is_in_main_thread(), "Terminating the job's thread 
should only be done from the scheduler"
-
-        if self._should_terminate:
-            return
-
-        with self._terminate_lock:
-            self._should_terminate = True
-            if self._thread_id is None:
-                return
-
-        terminate_thread(self._thread_id)

Reply via email to