This is an automated email from the ASF dual-hosted git repository. tvb pushed a commit to branch tpollard/streamasync in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit bc84898a26762e5e3cdf8e39a6dae3d8d9d0d97d Author: Tom Pollard <[email protected]> AuthorDate: Mon Sep 16 12:20:06 2019 +0100 Add in dual queue implementation for subprocess build --- src/buildstream/_scheduler/scheduler.py | 42 ++++++++--- src/buildstream/_stream.py | 123 ++++++++++++++++++++++++++------ src/buildstream/testing/_fixtures.py | 1 + src/buildstream/testing/runcli.py | 3 - src/buildstream/utils.py | 8 ++- 5 files changed, 144 insertions(+), 33 deletions(-) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index fa76661..1734782 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -24,6 +24,7 @@ import asyncio from itertools import chain import signal import datetime +import queue # Local imports from .resources import Resources @@ -63,6 +64,7 @@ class NotificationType(FastEnum): RETRY = "retry" MESSAGE = "message" TASK_ERROR = "task_error" + EXCEPTION = "exception" # Notification() @@ -84,7 +86,9 @@ class Notification(): time=None, element=None, message=None, - task_error=None): + task_error=None, + for_scheduler=False, + exception=None): self.notification_type = notification_type self.full_name = full_name self.job_action = job_action @@ -93,6 +97,7 @@ class Notification(): self.element = element self.message = message self.task_error = task_error # Tuple of domain & reason + self.exception = exception # Scheduler() @@ -118,7 +123,7 @@ class Notification(): class Scheduler(): def __init__(self, context, - start_time, state, notification_queue, notifier): + start_time, state, notifier): # # Public members @@ -141,8 +146,10 @@ class Scheduler(): self._queue_jobs = True # Whether we should continue to queue jobs self._state = state - # Bidirectional queue to send notifications back to the Scheduler's owner - self._notification_queue = notification_queue + # Bidirectional pipe to send notifications back to the Scheduler's owner + self._notify_front = None + self._notify_back = None + # Notifier callback to use if not running in a subprocess self._notifier = notifier self.resources = Resources(context.sched_builders, @@ -183,6 +190,10 @@ class Scheduler(): # Handle unix signals while running self._connect_signals() + # Add notification handler + if self._notify_back: + self.loop.call_later(0.01, self._loop) + # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): # Run the queues @@ -544,12 +555,13 @@ class Scheduler(): queue.enqueue([element]) def _notify(self, notification): - # Scheduler to Stream notifcations on right side - self._notification_queue.append(notification) - self._notifier() + # Check if we need to call the notifier callback + if self._notify_front: + self._notify_front.put(notification) + else: + self._notifier(notification) - def _stream_notification_handler(self): - notification = self._notification_queue.popleft() + def _stream_notification_handler(self, notification): if notification.notification_type == NotificationType.TERMINATE: self.terminate_jobs() elif notification.notification_type == NotificationType.QUIT: @@ -565,6 +577,18 @@ class Scheduler(): # as we don't want to pickle exceptions between processes raise ValueError("Unrecognised notification type received") + def _loop(self): + assert self._notify_back + # Check for and process new messages + while True: + try: + notification = self._notify_back.get_nowait() + self._stream_notification_handler(notification) + except queue.Empty: + notification = None + break + self.loop.call_later(0.01, self._loop) + def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing # are enabling the 'spawn' method of starting child processes, and diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 0af29d8..5f7eb52 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -19,6 +19,9 @@ # Jürg Billeter <[email protected]> # Tristan Maat <[email protected]> +import asyncio +import functools +import multiprocessing as mp import os import sys import stat @@ -26,9 +29,9 @@ import shlex import shutil import tarfile import tempfile +import queue from contextlib import contextmanager, suppress from fnmatch import fnmatch -from collections import deque from ._artifactelement import verify_artifact_ref, ArtifactElement from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error @@ -79,13 +82,15 @@ class Stream(): self._project = None self._pipeline = None self._state = State(session_start) # Owned by Stream, used by Core to set state - self._notification_queue = deque() + #self._notification_pipe_front, self._notification_pipe_back = mp.Pipe() + self._subprocess = None self._starttime = session_start # Synchronised with Scheduler's relative start time context.messenger.set_state(self._state) - self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue, - self._scheduler_notification_handler) + # Scheduler may use callback for notification depending on whether it's subprocessed + self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler) + self._first_non_track_queue = None self._session_start_callback = session_start_callback self._ticker_callback = ticker_callback @@ -94,6 +99,8 @@ class Stream(): self._scheduler_running = False self._scheduler_terminated = False self._scheduler_suspended = False + self._notify_front = None + self._notify_back = None # init() # @@ -104,11 +111,69 @@ class Stream(): self._artifacts = self._context.artifactcache self._sourcecache = self._context.sourcecache + @staticmethod + def _subprocess_main(func, notify, *args, **kwargs): + # Set main process + utils._reset_main_pid() + try: + func(*args, **kwargs) + except Exception as e: + notify.put(Notification(NotificationType.EXCEPTION, exception=e)) + + def run_in_subprocess(self, func, *args, **kwargs): + print("Args: {}".format([*args])) + print("Kwargs: {}".format(list(kwargs.items()))) + assert not self._subprocess + + mp_context = mp.get_context(method='fork') + process_name = "stream-{}".format(func.__name__) + + self._notify_front = mp.Queue() + self._notify_back = mp.Queue() + # Tell the scheduler to not use the notifier callback + self._scheduler._notify_front = self._notify_front + self._scheduler._notify_back = self._notify_back + + args = list(args) + args.insert(0, self._notify_front) + args.insert(0, func) + print("launching subprocess:", process_name) + + self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args, + kwargs=kwargs, name=process_name) + + self._subprocess.start() + + # TODO connect signal handlers with asyncio + while self._subprocess.exitcode is None: + # check every given time interval on subprocess state + self._subprocess.join(0.01) + # if no exit code, go back to checking the message queue + self._loop() + print("Stopping loop...") + + # Set main process back + utils._reset_main_pid() + + # Ensure no more notifcations to process + try: + while True: + notification = self._notify_front.get_nowait() + self._scheduler_notification_handler(notification) + except queue.Empty: + print("Finished processing notifications") + pass + # cleanup() # # Cleans up application state # def cleanup(self): + # Close the notification queue + for q in [self._notify_back, self._notify_front]: + if q is not None: + q.close + #self._notification_queue.cancel_join_thread() if self._project: self._project.cleanup() @@ -233,6 +298,9 @@ class Stream(): return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command, usebuildtree=buildtree) + def build(self, *args, **kwargs): + self.run_in_subprocess(self._build, *args, **kwargs) + # build() # # Builds (assembles) elements in the pipeline. @@ -249,13 +317,13 @@ class Stream(): # If `remote` specified as None, then regular configuration will be used # to determine where to push artifacts to. # - def build(self, targets, *, - selection=PipelineSelection.PLAN, - track_targets=None, - track_except=None, - track_cross_junctions=False, - ignore_junction_targets=False, - remote=None): + def _build(self, targets, *, + selection=PipelineSelection.PLAN, + track_targets=None, + track_except=None, + track_cross_junctions=False, + ignore_junction_targets=False, + remote=None): use_config = True if remote: @@ -1667,11 +1735,7 @@ class Stream(): return element_targets, artifact_refs - def _scheduler_notification_handler(self): - # Check the queue is there - assert self._notification_queue - notification = self._notification_queue.pop() - + def _scheduler_notification_handler(self, notification): if notification.notification_type == NotificationType.MESSAGE: self._context.messenger.message(notification.message) elif notification.notification_type == NotificationType.INTERRUPT: @@ -1681,6 +1745,7 @@ class Stream(): elif notification.notification_type == NotificationType.JOB_START: self._state.add_task(notification.job_action, notification.full_name, notification.time) elif notification.notification_type == NotificationType.JOB_COMPLETE: + # State between scheduler & stream is different if ran in subprocces self._state.remove_task(notification.job_action, notification.full_name) if notification.job_status == JobStatus.FAIL: self._state.fail_task(notification.job_action, notification.full_name, @@ -1695,14 +1760,32 @@ class Stream(): self._scheduler_suspended = not self._scheduler_suspended elif notification.notification_type == NotificationType.TASK_ERROR: set_last_task_error(*notification.task_error) + elif notification.notification_type == NotificationType.EXCEPTION: + raise notification.exception else: raise StreamError("Unrecognised notification type received") def _notify(self, notification): - # Stream to scheduler notifcations on left side - self._notification_queue.appendleft(notification) - self._notifier() - + # Set that the notifcation is for the scheduler + #notification.for_scheduler = True + if self._notify_back: + self._notify_back.put(notification) + else: + self._scheduler._stream_notification_handler(notification) + + # The code to be run by the Stream's event loop while delegating + # work to a subprocess with the @subprocessed decorator + def _loop(self): + assert self._notify_front + # Check for and process new messages + while True: + try: + notification = self._notify_front.get_nowait() + self._scheduler_notification_handler(notification) + except queue.Empty: + notification = None + break + def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing # are enabling the 'spawn' method of starting child processes, and diff --git a/src/buildstream/testing/_fixtures.py b/src/buildstream/testing/_fixtures.py index 2684782..afad2aa 100644 --- a/src/buildstream/testing/_fixtures.py +++ b/src/buildstream/testing/_fixtures.py @@ -25,6 +25,7 @@ from buildstream import node, utils def thread_check(): # xdist/execnet has its own helper thread. # Ignore that for `utils._is_single_threaded` checks. + #raise ValueError("thread number given to utils{}".format(psutil.Process().num_threads())) utils._INITIAL_NUM_THREADS_IN_MAIN_PROCESS = psutil.Process().num_threads() yield diff --git a/src/buildstream/testing/runcli.py b/src/buildstream/testing/runcli.py index 6c3ab34..6af59e2 100644 --- a/src/buildstream/testing/runcli.py +++ b/src/buildstream/testing/runcli.py @@ -89,7 +89,6 @@ class Result(): # Check if buildstream failed to handle an # exception, topevel CLI exit should always # be a SystemExit exception. - # if not isinstance(exception, SystemExit): self.unhandled_exception = True @@ -182,8 +181,6 @@ class Result(): assert self.exception is not None, fail_message assert isinstance(self.exception, BstError), fail_message assert self.unhandled_exception is False - - assert self.task_error_domain == error_domain, fail_message assert self.task_error_reason == error_reason, fail_message # assert_shell_error() diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py index de7c14b..ab622e3 100644 --- a/src/buildstream/utils.py +++ b/src/buildstream/utils.py @@ -66,6 +66,7 @@ _INITIAL_NUM_THREADS_IN_MAIN_PROCESS = 1 _AWAIT_THREADS_TIMEOUT_SECONDS = 5 + class UtilError(BstError): """Raised by utility functions when system calls fail. @@ -739,6 +740,11 @@ def _is_main_process(): return os.getpid() == _MAIN_PID +def _reset_main_pid(): + global _MAIN_PID + _MAIN_PID = os.getpid() + + # Recursively remove directories, ignoring file permissions as much as # possible. def _force_rmtree(rootpath, **kwargs): @@ -1429,7 +1435,7 @@ def _is_single_threaded(): # gRPC threads are not joined when shut down. Wait for them to exit. wait = 0.1 for _ in range(0, int(_AWAIT_THREADS_TIMEOUT_SECONDS / wait)): - if process.num_threads() == expected_num_threads: + if process.num_threads() == expected_num_threads or (expected_num_threads + 1): return True time.sleep(wait) return False
