This is an automated email from the ASF dual-hosted git repository. not-in-ldap pushed a commit to branch tpollard/subrebase in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 587fcb89048babdfee6d6e85a51f55b0238ccdcf Author: Tom Pollard <[email protected]> AuthorDate: Fri Oct 11 10:45:58 2019 +0100 basic async in stream --- src/buildstream/_context.py | 3 + src/buildstream/_exceptions.py | 10 +++ src/buildstream/_frontend/app.py | 57 ++++++------ src/buildstream/_frontend/status.py | 1 + src/buildstream/_scheduler/scheduler.py | 54 ++++++++---- src/buildstream/_stream.py | 148 ++++++++++++++++++++++++-------- 6 files changed, 196 insertions(+), 77 deletions(-) diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index f426f4b..d7a4437 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -174,6 +174,9 @@ class Context: self._workspace_project_cache = WorkspaceProjectCache() self._cascache = None + # An exception caught from subprocessing, used to handle run exceptions in App + self._subprocess_exception = None + # __enter__() # # Called when entering the with-statement context. diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py index 85fcf61..69acc69 100644 --- a/src/buildstream/_exceptions.py +++ b/src/buildstream/_exceptions.py @@ -45,6 +45,16 @@ def get_last_exception(): return le +# set_last_exception() +# +# Sets the last exception from the main process, used if Stream is running a subprocess +# +def set_last_exception(exception): + if "BST_TEST_SUITE" in os.environ: + global _last_exception + _last_exception = exception + + # get_last_task_error() # # Fetches the last exception from a task diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index 471901f..5fe38ce 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -300,39 +300,28 @@ class App: try: yield except BstError as e: + self._handle_run_exception(e, session_name) - # Print a nice summary if this is a session - if session_name: - elapsed = self.stream.elapsed_time - - if isinstance(e, StreamError) and e.terminated: # pylint: disable=no-member - self._message(MessageType.WARN, session_name + " Terminated", elapsed=elapsed) - else: - self._message(MessageType.FAIL, session_name, elapsed=elapsed) - - # Notify session failure - self._notify("{} failed".format(session_name), e) - - if self._started: - self._print_summary() - - # Exit with the error - self._error_exit(e) except RecursionError: click.echo( "RecursionError: Dependency depth is too large. Maximum recursion depth exceeded.", err=True ) sys.exit(-1) - else: + if self.context._subprocess_exception: + # If a handled exception was thrown in a Stream subprocessed asyncio method, handle it + if isinstance(self.context._subprocess_exception, BstError): + self._handle_run_exception(self.context._subprocess_exception, session_name) + else: + # We don't gracefully handle non BstError() Excpetions + raise self.context._subprocess_exception # pylint: disable=raising-bad-type + elif session_name: # No exceptions occurred, print session time and summary - if session_name: - self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time) - if self._started: - self._print_summary() - - # Notify session success - self._notify("{} succeeded".format(session_name), "") + self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time) + if self._started: + self._print_summary() + # Notify session success + self._notify("{} succeeded".format(session_name), "") # init_project() # @@ -972,6 +961,24 @@ class App: return (project_name, format_version, element_path) + def _handle_run_exception(self, exception, session_name): + # Print a nice summary if this is a session + if session_name: + elapsed = self.stream.elapsed_time + + if isinstance(exception, StreamError) and exception.terminated: # pylint: disable=no-member + self._message(MessageType.WARN, session_name + " Terminated", elapsed=elapsed) + else: + self._message(MessageType.FAIL, session_name, elapsed=elapsed) + + # Notify session failure + self._notify("{} failed".format(session_name), exception) + + if self._started: + self._print_summary() + + self._error_exit(exception) + # # Return a value processor for partial choice matching. diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py index d3132fe..f16e7d1 100644 --- a/src/buildstream/_frontend/status.py +++ b/src/buildstream/_frontend/status.py @@ -357,6 +357,7 @@ class _StatusHeader: # # ========= 00:00:00 project-name (143/387) ========= # + session = self._stream.len_session_elements total = self._stream.len_total_elements diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index e35d34e..ab85c35 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -72,6 +72,8 @@ class NotificationType(FastEnum): START = "start" TASK_GROUPS = "task_groups" ELEMENT_TOTALS = "element_totals" + FINISH = "finish" + SIGTSTP = "sigstp" # Notification() @@ -184,6 +186,9 @@ class Scheduler: # Hold on to the queues to process self.queues = queues + # Check if we're subprocessed + subprocessed = bool(self._notify_front_queue) + # Ensure that we have a fresh new event loop, in case we want # to run another test in this thread. self.loop = asyncio.new_event_loop() @@ -198,10 +203,11 @@ class Scheduler: # Handle unix signals while running self._connect_signals() - # Watch casd while running to ensure it doesn't die - self._casd_process = casd_process_manager.process - _watcher = asyncio.get_child_watcher() - _watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure) + # If we're not in a subprocess, watch casd while running to ensure it doesn't die + if not subprocessed: + self._casd_process = casd_process_manager.process + _watcher = asyncio.get_child_watcher() + _watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure) # Add notification listener if in subprocess self._start_listening() @@ -215,9 +221,11 @@ class Scheduler: self._stop_listening() self.loop.close() - # Stop watching casd - _watcher.remove_child_handler(self._casd_process.pid) - self._casd_process = None + # Stop watching casd if not subprocessed + if self._casd_process: + _watcher.remove_child_handler(self._casd_process.pid) + _watcher.close() + self._casd_process = None # Stop handling unix signals self._disconnect_signals() @@ -236,7 +244,7 @@ class Scheduler: status = SchedStatus.SUCCESS # Send the state taskgroups if we're running under the subprocess - if self._notify_front_queue: + if subprocessed: # Don't pickle state for group in self._state.task_groups.values(): group._state = None @@ -529,6 +537,8 @@ class Scheduler: if self.terminated: return + # This event handler is only set when not running in a subprocess, scheduler + # to handle keyboard interrupt notification = Notification(NotificationType.INTERRUPT) self._notify_front(notification) @@ -558,17 +568,29 @@ class Scheduler: # _connect_signals(): # - # Connects our signal handler event callbacks to the mainloop + # Connects our signal handler event callbacks to the mainloop. Signals + # only need to be connected if scheduler running in the 'main' process # def _connect_signals(self): - self.loop.add_signal_handler(signal.SIGINT, self._interrupt_event) - self.loop.add_signal_handler(signal.SIGTERM, self._terminate_event) - self.loop.add_signal_handler(signal.SIGTSTP, self._suspend_event) + if not self._notify_front_queue: + self.loop.add_signal_handler(signal.SIGINT, self._interrupt_event) + self.loop.add_signal_handler(signal.SIGTERM, self._terminate_event) + self.loop.add_signal_handler(signal.SIGTSTP, self._suspend_event) + # _disconnect_signals(): + # + # Disconnects our signal handler event callbacks from the mainloop. Signals + # only need to be disconnected if scheduler running in the 'main' process + # def _disconnect_signals(self): - self.loop.remove_signal_handler(signal.SIGINT) - self.loop.remove_signal_handler(signal.SIGTSTP) - self.loop.remove_signal_handler(signal.SIGTERM) + if not self._notify_front_queue: + self.loop.remove_signal_handler(signal.SIGINT) + self.loop.remove_signal_handler(signal.SIGTSTP) + self.loop.remove_signal_handler(signal.SIGTERM) + else: + # If running in a subprocess, ignore SIGINT when disconnected + # under the interrupted click.prompt() + signal.signal(signal.SIGINT, signal.SIG_IGN) def _terminate_jobs_real(self): def kill_jobs(): @@ -616,6 +638,8 @@ class Scheduler: self.jobs_unsuspended() elif notification.notification_type == NotificationType.RETRY: self._failure_retry(notification.job_action, notification.element) + elif notification.notification_type == NotificationType.SIGTSTP: + self._suspend_event() else: # Do not raise exception once scheduler process is separated # as we don't want to pickle exceptions between processes diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index b0cebc0..438e733 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -30,6 +30,7 @@ import shutil import tarfile import tempfile import queue +import signal from contextlib import contextmanager, suppress from fnmatch import fnmatch from typing import List, Tuple @@ -44,6 +45,7 @@ from ._exceptions import ( ArtifactError, set_last_task_error, SubprocessException, + set_last_exception, ) from ._message import Message, MessageType from ._scheduler import ( @@ -65,7 +67,7 @@ from ._profile import Topics, PROFILER from ._state import State from .types import _KeyStrength, _SchedulerErrorAction from .plugin import Plugin -from . import utils, _yaml, _site +from . import utils, _yaml, _site, _signals from . import Scope # Stream() @@ -91,8 +93,9 @@ class Stream: self.session_elements = [] # List of elements being processed this session self.total_elements = [] # Total list of elements based on targets self.queues = [] # Queue objects - self.len_session_elements = None - self.len_total_elements = None + self.len_session_elements = "" + self.len_total_elements = "" + self.loop = None # # Private members @@ -117,6 +120,8 @@ class Stream: self._scheduler_suspended = False self._notify_front_queue = None self._notify_back_queue = None + self._casd_process = None + self._watcher = None # init() # @@ -134,10 +139,13 @@ class Stream: # Add traceback pickling support pickling_support.install() - try: - func(*args, **kwargs) - except Exception as e: # pylint: disable=broad-except - notify.put(Notification(NotificationType.EXCEPTION, exception=SubprocessException(e))) + with _signals.blocked([signal.SIGINT, signal.SIGTERM, signal.SIGTSTP], ignore=True): + try: + func(*args, **kwargs) + except Exception as e: # pylint: disable=broad-except + notify.put(Notification(NotificationType.EXCEPTION, exception=SubprocessException(e))) + + notify.put(Notification(NotificationType.FINISH)) def run_in_subprocess(self, func, *args, **kwargs): assert not self._subprocess @@ -161,33 +169,48 @@ class Stream: self._subprocess.start() - # TODO connect signal handlers with asyncio - while self._subprocess.exitcode is None: - # check every given time interval on subprocess state - self._subprocess.join(0.01) - # if no exit code, go back to checking the message queue - self._loop() - print("Stopping loop...") + # We can now launch another async + self.loop = asyncio.new_event_loop() + self._connect_signals() + self._start_listening() + self.loop.set_exception_handler(self._handle_exception) + self._watch_casd() + self.loop.run_forever() + + # Scheduler has stopped running, so safe to still have async here + self._stop_listening() + self._stop_watching_casd() + self.loop.close() + self._disconnect_signals() + self.loop = None + self._subprocess.join() + self._subprocess = None # Ensure no more notifcations to process - try: - while True: - notification = self._notify_front_queue.get_nowait() - self._notification_handler(notification) - except queue.Empty: - print("Finished processing notifications") - pass + while not self._notify_front_queue.empty(): + notification = self._notify_front_queue.get_nowait() + self._notification_handler(notification) # cleanup() # # Cleans up application state # def cleanup(self): - # Close the notification queue + # Close the notification queues for q in [self._notify_back_queue, self._notify_front_queue]: if q is not None: q.close() - # self._notification_queue.cancel_join_thread() + q.join_thread() + q = None + + # Close loop + if self.loop is not None: + self.loop.close() + self.loop = None + + # Ensure global event loop policy is unset + asyncio.set_event_loop_policy(None) + if self._project: self._project.cleanup() @@ -1184,10 +1207,14 @@ class Stream: # Send the notification to suspend jobs notification = Notification(NotificationType.SUSPEND) self._notify_back(notification) + # Disconnect signals if stream is handling them + self._disconnect_signals() yield # Unsuspend jobs on context exit notification = Notification(NotificationType.UNSUSPEND) self._notify_back(notification) + # Connect signals if stream is handling them + self._connect_signals() ############################################################# # Private Methods # @@ -1431,13 +1458,13 @@ class Stream: # self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL)) - if self._session_start_callback is not None: - self._notify_front(Notification(NotificationType.START)) - # Also send through the session & total elements list lengths for status rendering element_totals = str(len(self.session_elements)), str(len(self.total_elements)) self._notify_front(Notification(NotificationType.ELEMENT_TOTALS, element_totals=element_totals)) + if self._session_start_callback is not None: + self._notify_front(Notification(NotificationType.START)) + status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager()) if status == SchedStatus.ERROR: @@ -1738,6 +1765,9 @@ class Stream: self._session_start_callback() elif notification.notification_type == NotificationType.ELEMENT_TOTALS: self.len_session_elements, self.len_total_elements = notification.element_totals + elif notification.notification_type == NotificationType.FINISH: + if self.loop: + self.loop.stop() else: raise StreamError("Unrecognised notification type received") @@ -1753,18 +1783,62 @@ class Stream: else: self._notification_handler(notification) - # The code to be run by the Stream's event loop while delegating - # work to a subprocess with the @subprocessed decorator def _loop(self): - assert self._notify_front_queue - # Check for and process new messages - while True: - try: - notification = self._notify_front_queue.get_nowait() - self._notification_handler(notification) - except queue.Empty: - notification = None - break + while not self._notify_front_queue.empty(): + notification = self._notify_front_queue.get_nowait() + self._notification_handler(notification) + + def _start_listening(self): + if self._notify_front_queue: + self.loop.add_reader(self._notify_front_queue._reader.fileno(), self._loop) + + def _stop_listening(self): + if self._notify_front_queue: + self.loop.remove_reader(self._notify_front_queue._reader.fileno()) + + def _watch_casd(self): + if self._context.get_cascache().get_casd_process_manager().process: + self._casd_process = self._context.get_cascache().get_casd_process_manager().process + self._watcher = asyncio.get_child_watcher() + self._watcher.attach_loop(self.loop) + self._watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure) + + def _abort_on_casd_failure(self, pid, returncode): + message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.") + self._notify_front(Notification(NotificationType.MESSAGE, message=message)) + self._casd_process.returncode = returncode + notification = Notification(NotificationType.TERMINATE) + self._notify_back(notification) + + def _stop_watching_casd(self): + self._watcher.remove_child_handler(self._casd_process.pid) + self._watcher.close() + self._casd_process = None + + def _handle_exception(self, loop, context): + exception = context.get("exception") + # Set the last exception for the test suite if needed + set_last_exception(exception) + # Add it to context + self._context._subprocess_exception = exception + self.loop.stop() + + def _connect_signals(self): + if self.loop: + self.loop.add_signal_handler(signal.SIGINT, self._interrupt_callback) + self.loop.add_signal_handler( + signal.SIGTERM, lambda: self._notify_back(Notification(NotificationType.TERMINATE)) + ) + self.loop.add_signal_handler( + signal.SIGTSTP, lambda: self._notify_back(Notification(NotificationType.SIGTSTP)) + ) + + def _disconnect_signals(self): + if self.loop: + self.loop.remove_signal_handler(signal.SIGINT) + self.loop.remove_signal_handler(signal.SIGTSTP) + self.loop.remove_signal_handler(signal.SIGTERM) + signal.set_wakeup_fd(-1) def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing
