This is an automated email from the ASF dual-hosted git repository. not-in-ldap pushed a commit to branch tpollard/streamasync in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit d2d8f4856411ce2dc0c32110c75f8f7f88512beb Author: Tom Pollard <[email protected]> AuthorDate: Wed Sep 25 11:36:01 2019 +0100 Stop pickling exceptions, regen once off queue --- src/buildstream/_exceptions.py | 13 ++++++++++++- src/buildstream/_stream.py | 19 ++++++++----------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py index 947b831..fcf0c9e 100644 --- a/src/buildstream/_exceptions.py +++ b/src/buildstream/_exceptions.py @@ -113,6 +113,8 @@ class BstError(Exception): super().__init__(message) + self.message = message + # Additional error detail, these are used to construct detail # portions of the logging messages when encountered. # @@ -352,7 +354,6 @@ class StreamError(BstError): self.terminated = terminated - # AppError # # Raised from the frontend App directly @@ -378,3 +379,13 @@ class SkipJob(Exception): class ArtifactElementError(BstError): def __init__(self, message, *, detail=None, reason=None): super().__init__(message, detail=detail, domain=ErrorDomain.ELEMENT, reason=reason) + +class SubprocessException(BstError): + def __init__(self, **kwargs): + super().__init__(kwargs['message'], detail=kwargs['detail'], + domain=kwargs['domain'], reason=kwargs['reason'], temporary=kwargs['temporary']) + self.sandbox = kwargs['sandbox'] + try: + self.terminated = kwargs['terminated'] + except KeyError: + pass diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 5f7eb52..9b2fd5c 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -34,7 +34,7 @@ from contextlib import contextmanager, suppress from fnmatch import fnmatch from ._artifactelement import verify_artifact_ref, ArtifactElement -from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error +from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error, SubprocessException from ._message import Message, MessageType from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \ SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, Notification, JobStatus @@ -117,12 +117,12 @@ class Stream(): utils._reset_main_pid() try: func(*args, **kwargs) - except Exception as e: - notify.put(Notification(NotificationType.EXCEPTION, exception=e)) + except BstError as e: + # Send the exceptions members dict to be reraised in main process + exception_attrs = vars(e) + notify.put(Notification(NotificationType.EXCEPTION, exception=exception_attrs)) def run_in_subprocess(self, func, *args, **kwargs): - print("Args: {}".format([*args])) - print("Kwargs: {}".format(list(kwargs.items()))) assert not self._subprocess mp_context = mp.get_context(method='fork') @@ -137,7 +137,6 @@ class Stream(): args = list(args) args.insert(0, self._notify_front) args.insert(0, func) - print("launching subprocess:", process_name) self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args, kwargs=kwargs, name=process_name) @@ -150,7 +149,6 @@ class Stream(): self._subprocess.join(0.01) # if no exit code, go back to checking the message queue self._loop() - print("Stopping loop...") # Set main process back utils._reset_main_pid() @@ -161,9 +159,9 @@ class Stream(): notification = self._notify_front.get_nowait() self._scheduler_notification_handler(notification) except queue.Empty: - print("Finished processing notifications") pass + # cleanup() # # Cleans up application state @@ -1761,13 +1759,12 @@ class Stream(): elif notification.notification_type == NotificationType.TASK_ERROR: set_last_task_error(*notification.task_error) elif notification.notification_type == NotificationType.EXCEPTION: - raise notification.exception + # Regenerate the exception here, so we don't have to pickle it + raise SubprocessException(**notification.exception) else: raise StreamError("Unrecognised notification type received") def _notify(self, notification): - # Set that the notifcation is for the scheduler - #notification.for_scheduler = True if self._notify_back: self._notify_back.put(notification) else:
