This is an automated email from the ASF dual-hosted git repository. tvb pushed a commit to branch tpollard/streamasync in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 40934034e902c9776ee3a74df55e6d6bab98d922 Author: Tom Pollard <[email protected]> AuthorDate: Tue Sep 10 15:10:04 2019 +0100 scheduler.py: Notification for last_task_error propagation Add a notification for TASK_ERROR. As queues & job handlers will be running in a different process to the front end, the global state in the frontend Exception process needs to be notified. This is used internally for the BST_TEST_SUITE. --- src/buildstream/_scheduler/jobs/job.py | 6 +++--- src/buildstream/_scheduler/queues/queue.py | 4 ++-- src/buildstream/_scheduler/scheduler.py | 20 +++++++++++++++++++- src/buildstream/_stream.py | 4 +++- 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 913e27e..0bb72a3 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -30,7 +30,7 @@ import asyncio import multiprocessing # BuildStream toplevel imports -from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob +from ..._exceptions import ImplError, BstError, SkipJob from ..._message import Message, MessageType, unconditional_messages from ...types import FastEnum from ... import _signals, utils @@ -541,8 +541,8 @@ class Job(): # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state # is currently managed in _exceptions.py - set_last_task_error(envelope.message['domain'], - envelope.message['reason']) + self._scheduler.set_last_task_error(envelope.message['domain'], + envelope.message['reason']) elif envelope.message_type is _MessageType.RESULT: assert self._result is None self._result = envelope.message diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 6c6dfdc..b95ca56 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -30,7 +30,7 @@ from ..jobs import ElementJob, JobStatus from ..resources import ResourceType # BuildStream toplevel imports -from ..._exceptions import BstError, ImplError, set_last_task_error +from ..._exceptions import BstError, ImplError from ..._message import Message, MessageType from ...types import FastEnum @@ -320,7 +320,7 @@ class Queue(): # # This just allows us stronger testing capability # - set_last_task_error(e.domain, e.reason) + self._scheduler.set_last_task_error(e.domain, e.reason) except Exception: # pylint: disable=broad-except diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index d0a1895..fa76661 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -62,6 +62,7 @@ class NotificationType(FastEnum): SUSPENDED = "suspended" RETRY = "retry" MESSAGE = "message" + TASK_ERROR = "task_error" # Notification() @@ -82,7 +83,8 @@ class Notification(): job_status=None, time=None, element=None, - message=None): + message=None, + task_error=None): self.notification_type = notification_type self.full_name = full_name self.job_action = job_action @@ -90,6 +92,7 @@ class Notification(): self.time = time self.element = element self.message = message + self.task_error = task_error # Tuple of domain & reason # Scheduler() @@ -315,6 +318,21 @@ class Scheduler(): def notify_messenger(self, message): self._notify(Notification(NotificationType.MESSAGE, message=message)) + # set_last_task_error() + # + # Save the last error domain / reason reported from a child job or queue + # in the main process. + # + # Args: + # domain (ErrorDomain): Enum for the domain from which the error occurred + # reason (str): String identifier representing the reason for the error + # + def set_last_task_error(self, domain, reason): + task_error = domain, reason + notification = Notification(NotificationType.TASK_ERROR, + task_error=task_error) + self._notify(notification) + ####################################################### # Local Private Methods # ####################################################### diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 75b3dd8..0af29d8 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -31,7 +31,7 @@ from fnmatch import fnmatch from collections import deque from ._artifactelement import verify_artifact_ref, ArtifactElement -from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError +from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error from ._message import Message, MessageType from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \ SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, Notification, JobStatus @@ -1693,6 +1693,8 @@ class Stream(): self._scheduler_terminated = True elif notification.notification_type == NotificationType.SUSPENDED: self._scheduler_suspended = not self._scheduler_suspended + elif notification.notification_type == NotificationType.TASK_ERROR: + set_last_task_error(*notification.task_error) else: raise StreamError("Unrecognised notification type received")
