This is an automated email from the ASF dual-hosted git repository. not-in-ldap pushed a commit to branch phil/ui-split-refactor in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit d12e62d7aeb8595133b0c732a4b0038ae2479a24 Author: Phil Dawson <[email protected]> AuthorDate: Thu Jul 4 09:55:51 2019 +0100 WIP: Add a way to run stream methods in a subprocess Currently the frontend is getting stuck in an infite loop. Also contains debugging print statements which will need to be stripped out. --- src/buildstream/_frontend/app.py | 153 ++++++++++++++++---------------- src/buildstream/_scheduler/scheduler.py | 6 +- src/buildstream/_stream.py | 67 ++++++++++---- 3 files changed, 129 insertions(+), 97 deletions(-) diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index 9f90938..90070af 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -596,84 +596,83 @@ class App(): def _handle_failure(self, element, queue, failure): - # Handle non interactive mode setting of what to do when a job fails. - if not self._interactive_failures: + # # Handle non interactive mode setting of what to do when a job fails. + # if not self._interactive_failures: - if self.context.sched_error_action == _SchedulerErrorAction.TERMINATE: - self.stream.terminate() - elif self.context.sched_error_action == _SchedulerErrorAction.QUIT: - self.stream.quit() - elif self.context.sched_error_action == _SchedulerErrorAction.CONTINUE: - pass - return - - assert False - # Interactive mode for element failures - with self._interrupted(): - - summary = ("\n{} failure on element: {}\n".format(failure.action_name, element.name) + - "\n" + - "Choose one of the following options:\n" + - " (c)ontinue - Continue queueing jobs as much as possible\n" + - " (q)uit - Exit after all ongoing jobs complete\n" + - " (t)erminate - Terminate any ongoing jobs and exit\n" + - " (r)etry - Retry this job\n") - if failure.logfile: - summary += " (l)og - View the full log file\n" - if failure.sandbox: - summary += " (s)hell - Drop into a shell in the failed build sandbox\n" - summary += "\nPressing ^C will terminate jobs and exit\n" - - choices = ['continue', 'quit', 'terminate', 'retry'] - if failure.logfile: - choices += ['log'] - if failure.sandbox: - choices += ['shell'] - - choice = '' - while choice not in ['continue', 'quit', 'terminate', 'retry']: - click.echo(summary, err=True) - - self._notify("BuildStream failure", "{} on element {}" - .format(failure.action_name, element.name)) - - try: - choice = click.prompt("Choice:", default='continue', err=True, - value_proc=_prefix_choice_value_proc(choices)) - except click.Abort: - # Ensure a newline after automatically printed '^C' - click.echo("", err=True) - choice = 'terminate' - - # Handle choices which you can come back from - # - assert choice != 'shell' # This won't work for now - if choice == 'shell': - click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True) - try: - prompt = self.shell_prompt(element) - self.stream.shell(element, Scope.BUILD, prompt, isolate=True, usebuildtree='always') - except BstError as e: - click.echo("Error while attempting to create interactive shell: {}".format(e), err=True) - elif choice == 'log': - with open(failure.logfile, 'r') as logfile: - content = logfile.read() - click.echo_via_pager(content) - - if choice == 'terminate': - click.echo("\nTerminating all jobs\n", err=True) - self.stream.terminate() - else: - if choice == 'quit': - click.echo("\nCompleting ongoing tasks before quitting\n", err=True) - self.stream.quit() - elif choice == 'continue': - click.echo("\nContinuing with other non failing elements\n", err=True) - elif choice == 'retry': - click.echo("\nRetrying failed job\n", err=True) - # FIXME: Outstandingly nasty modification of core state - queue._task_group.failed_tasks.remove(element._get_full_name()) - queue.enqueue([element]) + if self.context.sched_error_action == 'terminate': + self.stream.terminate() + elif self.context.sched_error_action == 'quit': + self.stream.quit() + elif self.context.sched_error_action == 'continue': + pass + return + + # assert False + # # Interactive mode for element failures + # with self._interrupted(): + + # summary = ("\n{} failure on element: {}\n".format(failure.action_name, element.name) + + # "\n" + + # "Choose one of the following options:\n" + + # " (c)ontinue - Continue queueing jobs as much as possible\n" + + # " (q)uit - Exit after all ongoing jobs complete\n" + + # " (t)erminate - Terminate any ongoing jobs and exit\n" + + # " (r)etry - Retry this job\n") + # if failure.logfile: + # summary += " (l)og - View the full log file\n" + # if failure.sandbox: + # summary += " (s)hell - Drop into a shell in the failed build sandbox\n" + # summary += "\nPressing ^C will terminate jobs and exit\n" + + # choices = ['continue', 'quit', 'terminate', 'retry'] + # if failure.logfile: + # choices += ['log'] + # if failure.sandbox: + # choices += ['shell'] + + # choice = '' + # while choice not in ['continue', 'quit', 'terminate', 'retry']: + # click.echo(summary, err=True) + + # self._notify("BuildStream failure", "{} on element {}" + # .format(failure.action_name, element.name)) + + # try: + # choice = click.prompt("Choice:", default='continue', err=True, + # value_proc=_prefix_choice_value_proc(choices)) + # except click.Abort: + # # Ensure a newline after automatically printed '^C' + # click.echo("", err=True) + # choice = 'terminate' + + # # Handle choices which you can come back from + # # + # assert choice != 'shell' # This won't work for now + # if choice == 'shell': + # click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True) + # try: + # prompt = self.shell_prompt(element) + # self.stream.shell(element, Scope.BUILD, prompt, isolate=True, usebuildtree='always') + # except BstError as e: + # click.echo("Error while attempting to create interactive shell: {}".format(e), err=True) + # elif choice == 'log': + # with open(failure.logfile, 'r') as logfile: + # content = logfile.read() + # click.echo_via_pager(content) + + # if choice == 'terminate': + # click.echo("\nTerminating all jobs\n", err=True) + # self.stream.terminate() + # else: + # if choice == 'quit': + # click.echo("\nCompleting ongoing tasks before quitting\n", err=True) + # self.stream.quit() + # elif choice == 'continue': + # click.echo("\nContinuing with other non failing elements\n", err=True) + # elif choice == 'retry': + # click.echo("\nRetrying failed job\n", err=True) + # queue.failed_elements.remove(element) + # queue.enqueue([element]) # # Print the session heading if we've loaded a pipeline and there diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 14ecf30..2e9c740 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -52,6 +52,7 @@ class NotificationType(enum.Enum): JOB_START = "job_start" JOB_COMPLETE = "job_complete" TICK = "tick" + EXCEPTION = "exception" class Notification: @@ -64,7 +65,9 @@ class Notification: job_status=None, failed_element=False, elapsed_time=None, - element=None): + element=None, + exception=None): + self.notification_type = notification_type self.full_name = full_name self.job_action = job_action @@ -72,6 +75,7 @@ class Notification: self.failed_element = failed_element self.elapsed_time = elapsed_time self.element = element + self.exception = exception # Scheduler() diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index e8b3931..d2a4d08 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -19,6 +19,7 @@ # Jürg Billeter <[email protected]> # Tristan Maat <[email protected]> +import asyncio import itertools import functools import multiprocessing as mp @@ -31,6 +32,7 @@ import tarfile import tempfile from contextlib import contextmanager, suppress from fnmatch import fnmatch +import queue from ._artifact import Artifact from ._artifactelement import verify_artifact_ref, ArtifactElement @@ -46,6 +48,40 @@ from . import utils, _yaml, _site from . import Scope, Consistency +# A decorator which runs the decorated method to be run in a subprocess +def subprocessed(func): + + @functools.wraps(func) + def _subprocessed(self, *args, **kwargs): + assert self + print("Args: {}".format([*args])) + print("Kwargs: {}".format(list(kwargs.items()))) + assert not self._subprocess + + # TODO use functools to pass arguments to func to make target for subprocess + + # Start subprocessed work + mp_context = mp.get_context(method='spawn') + process_name = "stream-{}".format(func.__name__) + target = functools.partial(func, self, *args, **kwargs) + print("launching subprocess:", process_name) + self._subprocess = mp_context.Process(target=target, name=process_name) + self._subprocess.run() + + # TODO connect signal handlers + + # Run event loop. This event loop should exit once the + # subprocessed work has completed + print("Starting loop...") + while not self._subprocess.exitcode: + self._loop() + print("Stopping loop...") + + # Return result of subprocessed function + + return _subprocessed + + # Stream() # # This is the main, toplevel calling interface in BuildStream core. @@ -77,6 +113,7 @@ class Stream(): # # Private members # + self._subprocess = None self._notification_queue = mp.Queue() self._context = context self._artifacts = None @@ -245,6 +282,7 @@ class Stream(): # If `remote` specified as None, then regular configuration will be used # to determine where to push artifacts to. # + @subprocessed def build(self, targets, *, track_targets=None, track_except=None, @@ -1592,6 +1630,9 @@ class Stream(): else: unique_id = None self._state.fail_task(notification.job_action, notification.full_name, unique_id) + elif notification.notification_type == NotificationType.EXCEPTION: + # TODO + pass else: raise StreamError("Unreccognised notification type recieved") @@ -1610,31 +1651,19 @@ class Stream(): # raise TypeError("Stream objects should not be pickled.") - # TODO - # Causes the decorated method to be run in a subprocess - @contextmanager - def subprocessed(self, func, *args, **kwargs): - pass - # Set up event loop - - # Start subprocessed work - - # Run event loop. This event loop should exit once the - # subprocessed work has completed - - # Return result of subprocessed function - # The code to be run by the Stream's event loop while delegating - # work to a subprocess with the @subprocessed + # work to a subprocess with the @subprocessed decorator def _loop(self): assert self._notification_queue - # Check that the subprocessed work has not finished - # TODO # Check for new messages - notification = self._notification_queue.get(block=True, timeout=0.1) + try: + notification = self._notification_queue.get(block=True, timeout=0.1) + except queue.Empty: + notification = None + print("queue empty, continuing...") # Process new messages if notification: + print("handling notifications") self._scheduler_notification_handler(notification) -
