This is an automated email from the ASF dual-hosted git repository. github-bot pushed a commit to branch bschubert/no-multiprocessing-full in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 58986f982d34177f753feaf9501038e43200bd73 Author: Benjamin Schubert <[email protected]> AuthorDate: Wed Jul 8 19:17:05 2020 +0000 _messenger.py: Make the messenger aware of jobs and stop having multiple --- src/buildstream/_frontend/app.py | 8 +- src/buildstream/_messenger.py | 245 +++++++++++++++++++-------------- src/buildstream/_scheduler/jobs/job.py | 65 ++------- tests/testutils/context.py | 2 +- 4 files changed, 154 insertions(+), 166 deletions(-) diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index 5d49e96..88c11c1 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -34,7 +34,7 @@ from .._context import Context from .._project import Project from .._exceptions import BstError, StreamError, LoadError, AppError from ..exceptions import LoadErrorReason -from .._message import Message, MessageType, unconditional_messages +from .._message import Message, MessageType from .._stream import Stream from ..types import _SchedulerErrorAction from .. import node @@ -791,7 +791,7 @@ class App: # # Handle messages from the pipeline # - def _message_handler(self, message, is_silenced): + def _message_handler(self, message): # Drop status messages from the UI if not verbose, we'll still see # info messages and status messages will still go to the log files. @@ -802,10 +802,6 @@ class App: if message.message_type in [MessageType.FAIL, MessageType.BUG] and message.element_name is not None: self._fail_messages[message.element_name] = message - # Send to frontend if appropriate - if is_silenced and (message.message_type not in unconditional_messages): - return - # Format the message & cache it text = self.logger.render(message) self._message_text += text diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py index eb3bd51..3220cb1 100644 --- a/src/buildstream/_messenger.py +++ b/src/buildstream/_messenger.py @@ -21,11 +21,11 @@ import os import datetime import threading from contextlib import contextmanager -from typing import Callable, Generator, Optional +from typing import Callable, Generator, Optional, TextIO from . import _signals from ._exceptions import BstError -from ._message import Message, MessageType +from ._message import Message, MessageType, unconditional_messages from ._state import State, _Task @@ -48,9 +48,86 @@ class _TimeData: self.start_time = start_time -class MessageHandlerCallback: - def __call__(self, message: Message, is_silenced: bool) -> None: - pass +class _JobRecorder: + def __init__(self, action_name: str, element_key: str, log_filename: str) -> None: + self.action_name = action_name + self.element_key = element_key + self.log_filename = log_filename + + self.log_handle: Optional[TextIO] = None + self.silence_scope_depth = 0 + + @contextmanager + def enable_recording(self) -> Generator["_JobRecorder", None, None]: + # Ensure the directory exists first + directory = os.path.dirname(self.log_filename) + os.makedirs(directory, exist_ok=True) + + with open(self.log_filename, "a") as logfile: + + # Write one last line to the log and flush it to disk + def flush_log(): + + # If the process currently had something happening in the I/O stack + # then trying to reenter the I/O stack will fire a runtime error. + # + # So just try to flush as well as we can at SIGTERM time + try: + logfile.write("\n\nForcefully terminated\n") + logfile.flush() + except RuntimeError: + os.fsync(logfile.fileno()) + + self.log_handle = logfile + + with _signals.terminator(flush_log): + yield self + + # record_message() + # + # Records the message if recording is enabled + # + # Args: + # message (Message): The message to record + # + def record_message(self, message: Message) -> None: + INDENT = " " + EMPTYTIME = "--:--:--" + template = "[{timecode: <8}] {type: <7}" + + # If this message is associated with an element or source plugin, print the + # full element name of the instance. + element_name = "" + if message.element_name: + template += " {element_name}" + element_name = message.element_name + + template += ": {message}" + + detail = "" + if message.detail is not None: + template += "\n\n{detail}" + detail = message.detail.rstrip("\n") + detail = INDENT + INDENT.join(detail.splitlines(True)) + + timecode = EMPTYTIME + if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): + hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 ** 2) + minutes, seconds = divmod(remainder, 60) + timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) + + text = template.format( + timecode=timecode, + element_name=element_name, + type=message.message_type.upper(), + message=message.message, + detail=detail, + ) + + # Write to the open log file + assert self.log_handle is not None + self.log_handle.write("{}\n".format(text)) + self.log_handle.flush() class Messenger: @@ -60,11 +137,11 @@ class Messenger: self._active_simple_tasks: int = 0 self._render_status_cb: Optional[Callable[[], None]] = None + self._message_handler: Optional[Callable[[Message], None]] = None + self._global_silence_scope_depth = 0 + self._locals = threading.local() - self._locals.message_handler = None - self._locals.log_handle = None - self._locals.log_filename = None - self._locals.silence_scope_depth = 0 + self._locals.job = None # set_message_handler() # @@ -74,8 +151,8 @@ class Messenger: # Args: # handler: The handler to call on message # - def set_message_handler(self, handler: MessageHandlerCallback) -> None: - self._locals.message_handler = handler + def set_message_handler(self, handler: Callable[[Message], None]) -> None: + self._message_handler = handler # set_state() # @@ -102,7 +179,9 @@ class Messenger: # Returns: Whether messages are currently being silenced # def _silent_messages(self) -> bool: - return self._locals.silence_scope_depth > 0 + if self._locals.job is not None: + return self._locals.job.silence_scope_depth > 0 + return self._global_silence_scope_depth > 0 # message(): # @@ -113,15 +192,30 @@ class Messenger: # message: A Message object # def message(self, message: Message) -> None: - # If we are recording messages, dump a copy into the open log file. - self._record_message(message) + job = self._locals.job + + if job is not None: + message.action_name = job.action_name + message.logfile = job.log_filename - # Send it off to the log handler (can be the frontend, - # or it can be the child task which will propagate - # to the frontend) - assert self._locals.message_handler + # If no key has been set at this point, and the element job has + # a related key, set it. + if message.element_key is None: + message.element_key = job.element_key - self._locals.message_handler(message, is_silenced=self._silent_messages()) + # Job always record messages + self._locals.job.record_message(message) + + # Don't log LOG messages from jobs + if message.message_type == MessageType.LOG: + return + + # Don't forward if it is currently silent + if self._silent_messages() and (message.message_type not in unconditional_messages): + return + + assert self._message_handler is not None + self._message_handler(message) # silence() # @@ -140,12 +234,22 @@ class Messenger: yield return - self._locals.silence_scope_depth += 1 + in_job = self._locals.job is not None + + if in_job: + self._locals.job.silence_scope_depth += 1 + else: + self._global_silence_scope_depth += 1 + try: yield finally: - assert self._locals.silence_scope_depth > 0 - self._locals.silence_scope_depth -= 1 + if in_job: + assert self._locals.job.silence_scope_depth > 0 + self._locals.job.silence_scope_depth -= 1 + else: + assert self._global_silence_scope_depth > 0 + self._global_silence_scope_depth -= 1 # timed_activity() # @@ -254,7 +358,7 @@ class Messenger: ) self.message(message) - # recorded_messages() + # record_job() # # Records all messages in a log file while the context manager # is active. @@ -274,42 +378,20 @@ class Messenger: # Yields: The fully qualified log filename # @contextmanager - def recorded_messages(self, filename: str, logdir: str) -> Generator[str, None, None]: + def record_job( + self, action_name: str, element_key: str, filename: str, logdir: str + ) -> Generator[_JobRecorder, None, None]: # We dont allow recursing in this context manager, and # we also do not allow it in the main process. - assert not hasattr(self._locals, "log_handle") or self._locals.log_handle is None - assert not hasattr(self._locals, "log_filename") or self._locals.log_filename is None + assert not hasattr(self._locals, "job") or self._locals.job is None - # Create the fully qualified logfile in the log directory, - # appending the pid and .log extension at the end. - self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid())) - self._locals.silence_scope_depth = 0 + log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid())) + self._locals.job = _JobRecorder(action_name, element_key, log_filename) - # Ensure the directory exists first - directory = os.path.dirname(self._locals.log_filename) - os.makedirs(directory, exist_ok=True) - - with open(self._locals.log_filename, "a") as logfile: + with self._locals.job.enable_recording() as job: + yield job - # Write one last line to the log and flush it to disk - def flush_log(): - - # If the process currently had something happening in the I/O stack - # then trying to reenter the I/O stack will fire a runtime error. - # - # So just try to flush as well as we can at SIGTERM time - try: - logfile.write("\n\nForcefully terminated\n") - logfile.flush() - except RuntimeError: - os.fsync(logfile.fileno()) - - self._locals.log_handle = logfile - with _signals.terminator(flush_log): - yield self._locals.log_filename - - self._locals.log_handle = None - self._locals.log_filename = None + self._locals.job = None # get_log_handle() # @@ -320,7 +402,9 @@ class Messenger: # Returns: The active logging file handle, or None # def get_log_handle(self) -> Optional[str]: - return self._locals.log_handle + if self._locals.job is not None: + return self._locals.job.log_handle + return None # get_log_filename() # @@ -331,7 +415,7 @@ class Messenger: # Returns: The active logging filename, or None # def get_log_filename(self) -> str: - return self._locals.log_filename + return self._locals.job.log_filename # timed_suspendable() # @@ -361,55 +445,6 @@ class Messenger: with _signals.suspendable(stop_time, resume_time): yield timedata - # _record_message() - # - # Records the message if recording is enabled - # - # Args: - # message (Message): The message to record - # - def _record_message(self, message: Message) -> None: - - if self._locals.log_handle is None: - return - - INDENT = " " - EMPTYTIME = "--:--:--" - template = "[{timecode: <8}] {type: <7}" - - # If this message is associated with an element or source plugin, print the - # full element name of the instance. - element_name = "" - if message.element_name: - template += " {element_name}" - element_name = message.element_name - - template += ": {message}" - - detail = "" - if message.detail is not None: - template += "\n\n{detail}" - detail = message.detail.rstrip("\n") - detail = INDENT + INDENT.join(detail.splitlines(True)) - - timecode = EMPTYTIME - if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): - hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 ** 2) - minutes, seconds = divmod(remainder, 60) - timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) - - text = template.format( - timecode=timecode, - element_name=element_name, - type=message.message_type.upper(), - message=message.message, - detail=detail, - ) - - # Write to the open log file - self._locals.log_handle.write("{}\n".format(text)) - self._locals.log_handle.flush() - # _render_status() # # Calls the render status callback set in the messenger, but only if a diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index f331d3f..30308a9 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -28,7 +28,7 @@ import traceback # BuildStream toplevel imports from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob -from ..._message import Message, MessageType, unconditional_messages +from ..._message import Message, MessageType from ...types import FastEnum @@ -67,7 +67,6 @@ class _Envelope: class _MessageType(FastEnum): - LOG_MESSAGE = 1 ERROR = 2 RESULT = 3 @@ -389,11 +388,7 @@ class Job: if not self._listening: return - if envelope.message_type is _MessageType.LOG_MESSAGE: - # Propagate received messages from children - # back through the context. - self._messenger.message(envelope.message) - elif envelope.message_type is _MessageType.ERROR: + if envelope.message_type is _MessageType.ERROR: # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state # is currently managed in _exceptions.py @@ -543,21 +538,20 @@ class ChildJob: # Set the global message handler in this child # process to forward messages to the parent process self._pipe_w = pipe_w - self._messenger.set_message_handler(self._child_message_handler) # Time, log and and run the action function # - with self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages( - self._logfile, self._logdir - ) as filename: - self.message(MessageType.START, self.action_name, logfile=filename) + with self._messenger.timed_suspendable() as timeinfo, self._messenger.record_job( + self.action_name, self._message_element_key, self._logfile, self._logdir + ): + self.message(MessageType.START, self.action_name) try: # Try the task action result = self.child_process() # pylint: disable=assignment-from-no-return except SkipJob as e: elapsed = datetime.datetime.now() - timeinfo.start_time - self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename) + self.message(MessageType.SKIPPED, str(e), elapsed=elapsed) # Alert parent of skip by return code return _ReturnCode.SKIPPED @@ -567,15 +561,10 @@ class ChildJob: if retry_flag and (self._tries <= self._max_retries): self.message( - MessageType.FAIL, - "Try #{} failed, retrying".format(self._tries), - elapsed=elapsed, - logfile=filename, + MessageType.FAIL, "Try #{} failed, retrying".format(self._tries), elapsed=elapsed, ) else: - self.message( - MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox - ) + self.message(MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, sandbox=e.sandbox) # Report the exception to the parent (for internal testing purposes) self._child_send_error(e) @@ -593,7 +582,7 @@ class ChildJob: elapsed = datetime.datetime.now() - timeinfo.start_time detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) - self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename) + self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail) # Unhandled exceptions should permenantly fail return _ReturnCode.PERM_FAIL @@ -602,7 +591,7 @@ class ChildJob: self._child_send_result(result) elapsed = datetime.datetime.now() - timeinfo.start_time - self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename) + self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed) # Shutdown needs to stay outside of the above context manager, # make sure we dont try to handle SIGTERM while the process @@ -660,35 +649,3 @@ class ChildJob: def _child_send_result(self, result): if result is not None: self._send_message(_MessageType.RESULT, result) - - # _child_message_handler() - # - # A Context delegate for handling messages, this replaces the - # frontend's main message handler in the context of a child task - # and performs local logging to the local log file before sending - # the message back to the parent process for further propagation. - # The related element display key is added to the message for - # widget rendering if not already set for an element childjob. - # - # Args: - # message (Message): The message to log - # is_silenced (bool) : Whether messages are silenced - # - def _child_message_handler(self, message, is_silenced): - - message.action_name = self.action_name - - # If no key has been set at this point, and the element job has - # a related key, set it. This is needed for messages going - # straight to the message handler from the child process. - if message.element_key is None and self._message_element_key: - message.element_key = self._message_element_key - - # Send to frontend if appropriate - if is_silenced and (message.message_type not in unconditional_messages): - return - - if message.message_type == MessageType.LOG: - return - - self._send_message(_MessageType.LOG_MESSAGE, message) diff --git a/tests/testutils/context.py b/tests/testutils/context.py index 821adef..ab14c1b 100644 --- a/tests/testutils/context.py +++ b/tests/testutils/context.py @@ -23,7 +23,7 @@ from buildstream._context import Context # Handle messages from the pipeline -def _dummy_message_handler(message, is_silenced): +def _dummy_message_handler(message): pass
