This is an automated email from the ASF dual-hosted git repository. github-bot pushed a commit to branch bschubert/no-multiprocessing-full in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 5e118e25031ac132cfed6ad1fd951be5560d9a62 Author: Benjamin Schubert <[email protected]> AuthorDate: Thu Jul 9 15:16:53 2020 +0100 WIP --- setup.py | 1 + src/buildstream/_scheduler/jobs/_job.pyx | 7 +++++++ src/buildstream/_scheduler/jobs/job.py | 16 ++++++++++++++-- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index d89d5e6..91232ea 100755 --- a/setup.py +++ b/setup.py @@ -305,6 +305,7 @@ BUILD_EXTENSIONS = [] register_cython_module("buildstream.node") register_cython_module("buildstream._loader._loader") register_cython_module("buildstream._loader.loadelement", dependencies=["buildstream.node"]) +register_cython_module("buildstream._scheduler.jobs._job") register_cython_module("buildstream._yaml", dependencies=["buildstream.node"]) register_cython_module("buildstream._types") register_cython_module("buildstream._utils") diff --git a/src/buildstream/_scheduler/jobs/_job.pyx b/src/buildstream/_scheduler/jobs/_job.pyx new file mode 100644 index 0000000..a928305 --- /dev/null +++ b/src/buildstream/_scheduler/jobs/_job.pyx @@ -0,0 +1,7 @@ +from cpython.pystate cimport PyThreadState_SetAsyncExc +from cpython.ref cimport PyObject + + +def abort_thread(long id): + res = PyThreadState_SetAsyncExc(id, <PyObject*> BaseException) + assert res == 1 diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index d461d0f..7d3519d 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -29,7 +29,9 @@ import traceback from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType from ...types import FastEnum +from ._job import abort_thread +import threading # Return code values shutdown of job handling child processes # @@ -118,6 +120,7 @@ class Job: self._element = None # The Element() passed to the Job() constructor, if applicable self._task = None # The task that is run + self._child = None # set_name() # @@ -136,7 +139,7 @@ class Job: self._tries += 1 # FIXME: remove the parent/child separation, it's not needed anymore. - child_job = self.create_child_job( # pylint: disable=assignment-from-no-return + self._child = self.create_child_job( # pylint: disable=assignment-from-no-return self.action_name, self._messenger, self._scheduler.context.logdir, @@ -151,7 +154,7 @@ class Job: async def execute(): try: - ret_code, self._result = await loop.run_in_executor(None, child_job.child_action) + ret_code, self._result = await loop.run_in_executor(None, self._child.child_action) except asyncio.CancelledError: ret_code = _ReturnCode.TERMINATED except Exception: # pylint: disable=broad-except @@ -172,6 +175,7 @@ class Job: # Terminate the process using multiprocessing API pathway if self._task: self._task.cancel() + self._child.terminate() self._terminated = True @@ -425,6 +429,8 @@ class ChildJob: # Perform the action in the child process, this calls the action_cb. # def child_action(self): + self._thread_id = threading.current_thread().ident + # Time, log and and run the action function # with self._messenger.timed_suspendable() as timeinfo, self._messenger.record_job( @@ -480,3 +486,9 @@ class ChildJob: # make sure we dont try to handle SIGTERM while the process # is already busy in sys.exit() return _ReturnCode.OK, result + + def terminate(self): + if self._thread_id is None: + return + + abort_thread(self._thread_id)
