This is an automated email from the ASF dual-hosted git repository. tvb pushed a commit to branch bschubert/remove-pipe-job in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 2149c97314dc012d437c71a33f59c52391572227 Author: Benjamin Schubert <[email protected]> AuthorDate: Mon Jan 11 12:00:07 2021 +0000 job.py: Completely remove the pipe between child and parent process This pipe is not needed at all anymore --- src/buildstream/_scheduler/jobs/job.py | 80 ++-------------------------------- 1 file changed, 3 insertions(+), 77 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index b6d7e6c..4e81931 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -24,7 +24,6 @@ import asyncio import datetime import itertools -import multiprocessing import threading import traceback @@ -113,8 +112,6 @@ class Job: # self._scheduler = scheduler # The scheduler self._messenger = self._scheduler.context.messenger - self._pipe_r = None # The read end of a pipe for message passing - self._listening = False # Whether the parent is currently listening self._suspended = False # Whether this job is currently suspended self._max_retries = max_retries # Maximum number of automatic retries self._result = None # Return value of child action in the parent @@ -143,11 +140,7 @@ class Job: assert not self._terminated, "Attempted to start process which was already terminated" - # FIXME: remove this, this is not necessary when using asyncio - self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False) - self._tries += 1 - self._parent_start_listening() # FIXME: remove the parent/child separation, it's not needed anymore. self._child = self.create_child_job( # pylint: disable=assignment-from-no-return @@ -164,7 +157,7 @@ class Job: loop = asyncio.get_event_loop() async def execute(): - ret_code, self._result = await loop.run_in_executor(None, self._child.child_action, pipe_w) + ret_code, self._result = await loop.run_in_executor(None, self._child.child_action) await self._parent_child_completed(ret_code) self._task = loop.create_task(execute()) @@ -178,9 +171,6 @@ class Job: def terminate(self): self.message(MessageType.STATUS, "{} terminating".format(self.action_name)) - # Make sure there is no garbage on the pipe - self._parent_stop_listening() - if self._task: self._child.terminate() @@ -289,16 +279,6 @@ class Job: # Local Private Methods # ####################################################### - # _parent_shutdown() - # - # Shuts down the Job on the parent side by reading any remaining - # messages on the message pipe and cleaning up any resources. - # - def _parent_shutdown(self): - # Make sure we've read everything we need and then stop listening - self._parent_process_pipe() - self._parent_stop_listening() - # _parent_child_completed() # # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler() @@ -307,8 +287,6 @@ class Job: # returncode (int): The return code of the child process # async def _parent_child_completed(self, returncode): - self._parent_shutdown() - try: returncode = _ReturnCode(returncode) except ValueError: @@ -347,50 +325,7 @@ class Job: self.parent_complete(status, self._result) self._scheduler.job_completed(self, status) - - # Force the deletion of the pipe and process objects to try and clean up FDs - self._pipe_r.close() - self._pipe_r = self._task = None - - # _parent_process_pipe() - # - # Reads back message envelopes from the message pipe - # in the parent process. - # - def _parent_process_pipe(self): - while self._pipe_r.poll(): - try: - self._pipe_r.recv() - assert False, "No message should be received anymore" - except EOFError: - self._parent_stop_listening() - break - - # _parent_recv() - # - # A callback to handle I/O events from the message - # pipe file descriptor in the main process message loop - # - def _parent_recv(self, *args): - self._parent_process_pipe() - - # _parent_start_listening() - # - # Starts listening on the message pipe - # - def _parent_start_listening(self): - if not self._listening: - self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv) - self._listening = True - - # _parent_stop_listening() - # - # Stops listening on the message pipe - # - def _parent_stop_listening(self): - if self._listening: - self._scheduler.loop.remove_reader(self._pipe_r.fileno()) - self._listening = False + self._task = None # ChildJob() @@ -431,7 +366,6 @@ class ChildJob: self._message_element_name = message_element_name self._message_element_key = message_element_key - self._pipe_w = None # The write end of a pipe for message passing self._thread_id = None # Thread in which the child executes its action self._should_terminate = False self._terminate_lock = threading.Lock() @@ -483,15 +417,9 @@ class ChildJob: # # Perform the action in the child process, this calls the action_cb. # - # Args: - # pipe_w (multiprocessing.connection.Connection): The message pipe for IPC - # - def child_action(self, pipe_w): - # Assign the pipe we passed across the process boundaries - # + def child_action(self): # Set the global message handler in this child # process to forward messages to the parent process - self._pipe_w = pipe_w self._messenger.setup_new_action_context( self.action_name, self._message_element_name, self._message_element_key ) @@ -572,8 +500,6 @@ class ChildJob: except TerminateException: self._thread_id = None return _ReturnCode.TERMINATED, None - finally: - self._pipe_w.close() # terminate() #
