This is an automated email from the ASF dual-hosted git repository. tvb pushed a commit to branch bschubert/remove-pipe-job in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 01b47778a5139a81f00498d994132434f474f1d4 Author: Benjamin Schubert <[email protected]> AuthorDate: Mon Jan 11 11:48:35 2021 +0000 job.py: Stop using the queue to send data between the child and parent This removes the need to have all messages processed in the master thread, and instead allows them to be done in any thread. * _messenger.py: - Store optional job information in the thread local storage and expand the message with it if it is present. - Make the message handler something global and remove the need to have a thread-specific one. - Have message filter out silenced and LOG messages from jobs * job.py: Remove the job-specific message handler --- src/buildstream/_messenger.py | 56 ++++++++++++++++++++++++++-------- src/buildstream/_scheduler/jobs/job.py | 45 ++++----------------------- 2 files changed, 49 insertions(+), 52 deletions(-) diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py index 01a8cfd..edb79ec 100644 --- a/src/buildstream/_messenger.py +++ b/src/buildstream/_messenger.py @@ -25,7 +25,7 @@ from typing import Optional, Callable, Iterator, TextIO from . import _signals from ._exceptions import BstError -from ._message import Message, MessageType +from ._message import Message, MessageType, unconditional_messages from ._state import State, Task @@ -48,6 +48,13 @@ class _TimeData: self.start_time: datetime.datetime = start_time +class _JobInfo: + def __init__(self, action_name: str, element_name: str, element_key: str) -> None: + self.action_name = action_name + self.element_name = element_name + self.element_key = element_key + + # _MessengerLocal # # Thread local storage for the messenger @@ -56,13 +63,6 @@ class _MessengerLocal(threading.local): def __init__(self) -> None: super().__init__() - # The callback to call when propagating messages - # - # FIXME: The message handler is currently not strongly typed, - # as it uses a kwarg, we cannot declare it with Callable. - # We can use `Protocol` to strongly type this with python >= 3.8 - self.message_handler = None - # The open file handle for this task self.log_handle: Optional[TextIO] = None @@ -72,6 +72,9 @@ class _MessengerLocal(threading.local): # Level of silent messages depth in this task self.silence_scope_depth: int = 0 + # Job + self.job: Optional[_JobInfo] = None + # Messenger() # @@ -97,8 +100,16 @@ class Messenger: # Thread local storage self._locals: _MessengerLocal = _MessengerLocal() - def setup_new_action_context(self) -> None: + # The callback to call when propagating messages + # + # FIXME: The message handler is currently not strongly typed, + # as it uses a kwarg, we cannot declare it with Callable. + # We can use `Protocol` to strongly type this with python >= 3.8 + self._message_handler = None + + def setup_new_action_context(self, action_name: str, element_name: str, element_key: str) -> None: self._locals.silence_scope_depth = 0 + self._locals.job = _JobInfo(action_name, element_name, element_key) # set_message_handler() # @@ -106,7 +117,7 @@ class Messenger: # the messenger. # def set_message_handler(self, handler) -> None: - self._locals.message_handler = handler + self._message_handler = handler # set_state() # @@ -140,12 +151,31 @@ class Messenger: # If we are recording messages, dump a copy into the open log file. self._record_message(message) + # Always add the log filename automatically + message.logfile = self._locals.log_filename + + is_silenced = self._silent_messages() + job = self._locals.job + + if job is not None: + # Automatically add message information from the job context + message.action_name = job.action_name + message.task_element_name = job.element_name + message.task_element_key = job.element_key + + # Don't forward LOG messages from jobs + if message.message_type == MessageType.LOG: + return + + # Don't forward JOB messages if they are currently silent + if is_silenced and (message.message_type not in unconditional_messages): + return + # Send it off to the log handler (can be the frontend, # or it can be the child task which will propagate # to the frontend) - assert self._locals.message_handler - - self._locals.message_handler(message, is_silenced=self._silent_messages()) + assert self._message_handler + self._message_handler(message, is_silenced=is_silenced) # status(): # diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 227d3a2..b6d7e6c 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -31,7 +31,7 @@ import traceback # BuildStream toplevel imports from ... import utils from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob -from ..._message import Message, MessageType, unconditional_messages +from ..._message import Message, MessageType from ...types import FastEnum from ._job import terminate_thread from ..._signals import TerminateException @@ -360,13 +360,12 @@ class Job: def _parent_process_pipe(self): while self._pipe_r.poll(): try: - message = self._pipe_r.recv() + self._pipe_r.recv() + assert False, "No message should be received anymore" except EOFError: self._parent_stop_listening() break - self._messenger.message(message) - # _parent_recv() # # A callback to handle I/O events from the message @@ -493,8 +492,9 @@ class ChildJob: # Set the global message handler in this child # process to forward messages to the parent process self._pipe_w = pipe_w - self._messenger.setup_new_action_context() - self._messenger.set_message_handler(self._child_message_handler) + self._messenger.setup_new_action_context( + self.action_name, self._message_element_name, self._message_element_key + ) # Time, log and and run the action function # @@ -593,36 +593,3 @@ class ChildJob: return terminate_thread(self._thread_id) - - ####################################################### - # Local Private Methods # - ####################################################### - - # _child_message_handler() - # - # A Context delegate for handling messages, this replaces the - # frontend's main message handler in the context of a child task - # and performs local logging to the local log file before sending - # the message back to the parent process for further propagation. - # The related element display key is added to the message for - # widget rendering if not already set for an element childjob. - # - # Args: - # message (Message): The message to log - # is_silenced (bool) : Whether messages are silenced - # - def _child_message_handler(self, message, is_silenced): - - message.action_name = self.action_name - message.task_element_name = self._message_element_name - message.task_element_key = self._message_element_key - - # Send to frontend if appropriate - if is_silenced and (message.message_type not in unconditional_messages): - return - - # Don't bother propagating these to the frontend - if message.message_type == MessageType.LOG: - return - - self._pipe_w.send(message)
