This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 5e118e25031ac132cfed6ad1fd951be5560d9a62
Author: Benjamin Schubert <[email protected]>
AuthorDate: Thu Jul 9 15:16:53 2020 +0100

    WIP
---
 setup.py                                 |  1 +
 src/buildstream/_scheduler/jobs/_job.pyx |  7 +++++++
 src/buildstream/_scheduler/jobs/job.py   | 16 ++++++++++++++--
 3 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/setup.py b/setup.py
index d89d5e6..91232ea 100755
--- a/setup.py
+++ b/setup.py
@@ -305,6 +305,7 @@ BUILD_EXTENSIONS = []
 register_cython_module("buildstream.node")
 register_cython_module("buildstream._loader._loader")
 register_cython_module("buildstream._loader.loadelement", 
dependencies=["buildstream.node"])
+register_cython_module("buildstream._scheduler.jobs._job")
 register_cython_module("buildstream._yaml", dependencies=["buildstream.node"])
 register_cython_module("buildstream._types")
 register_cython_module("buildstream._utils")
diff --git a/src/buildstream/_scheduler/jobs/_job.pyx 
b/src/buildstream/_scheduler/jobs/_job.pyx
new file mode 100644
index 0000000..a928305
--- /dev/null
+++ b/src/buildstream/_scheduler/jobs/_job.pyx
@@ -0,0 +1,7 @@
+from cpython.pystate cimport PyThreadState_SetAsyncExc
+from cpython.ref cimport PyObject
+
+
+def abort_thread(long id):
+    res = PyThreadState_SetAsyncExc(id, <PyObject*> BaseException)
+    assert res == 1
diff --git a/src/buildstream/_scheduler/jobs/job.py 
b/src/buildstream/_scheduler/jobs/job.py
index d461d0f..7d3519d 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -29,7 +29,9 @@ import traceback
 from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
 from ..._message import Message, MessageType
 from ...types import FastEnum
+from ._job import abort_thread
 
+import threading
 
 # Return code values shutdown of job handling child processes
 #
@@ -118,6 +120,7 @@ class Job:
         self._element = None  # The Element() passed to the Job() constructor, 
if applicable
 
         self._task = None  # The task that is run
+        self._child = None
 
     # set_name()
     #
@@ -136,7 +139,7 @@ class Job:
         self._tries += 1
 
         # FIXME: remove the parent/child separation, it's not needed anymore.
-        child_job = self.create_child_job(  # pylint: 
disable=assignment-from-no-return
+        self._child = self.create_child_job(  # pylint: 
disable=assignment-from-no-return
             self.action_name,
             self._messenger,
             self._scheduler.context.logdir,
@@ -151,7 +154,7 @@ class Job:
 
         async def execute():
             try:
-                ret_code, self._result = await loop.run_in_executor(None, 
child_job.child_action)
+                ret_code, self._result = await loop.run_in_executor(None, 
self._child.child_action)
             except asyncio.CancelledError:
                 ret_code = _ReturnCode.TERMINATED
             except Exception:  # pylint: disable=broad-except
@@ -172,6 +175,7 @@ class Job:
         # Terminate the process using multiprocessing API pathway
         if self._task:
             self._task.cancel()
+            self._child.terminate()
 
         self._terminated = True
 
@@ -425,6 +429,8 @@ class ChildJob:
     # Perform the action in the child process, this calls the action_cb.
     #
     def child_action(self):
+        self._thread_id = threading.current_thread().ident
+
         # Time, log and and run the action function
         #
         with self._messenger.timed_suspendable() as timeinfo, 
self._messenger.record_job(
@@ -480,3 +486,9 @@ class ChildJob:
                 # make sure we dont try to handle SIGTERM while the process
                 # is already busy in sys.exit()
                 return _ReturnCode.OK, result
+
+    def terminate(self):
+        if self._thread_id is None:
+            return
+
+        abort_thread(self._thread_id)

Reply via email to