This is an automated email from the ASF dual-hosted git repository. tvb pushed a commit to branch tpollard/streamasync in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit f307ac2616802d60dd4e95319f98d50739a549b7 Author: Tom Pollard <[email protected]> AuthorDate: Thu Sep 26 11:50:15 2019 +0100 Add notifications for session_start & task_groups --- src/buildstream/_scheduler/scheduler.py | 15 +++++++++++++-- src/buildstream/_stream.py | 11 +++++++++-- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 1734782..62c2754 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -65,6 +65,8 @@ class NotificationType(FastEnum): MESSAGE = "message" TASK_ERROR = "task_error" EXCEPTION = "exception" + START = "start" + TASK_GROUPS = "task_groups" # Notification() @@ -87,8 +89,8 @@ class Notification(): element=None, message=None, task_error=None, - for_scheduler=False, - exception=None): + exception=None, + task_groups=None): self.notification_type = notification_type self.full_name = full_name self.job_action = job_action @@ -98,6 +100,7 @@ class Notification(): self.message = message self.task_error = task_error # Tuple of domain & reason self.exception = exception + self.task_groups = task_groups # Scheduler() @@ -217,6 +220,14 @@ class Scheduler(): else: status = SchedStatus.SUCCESS + # Send the state taskgroups if we're running under the subprocess + if self._notify_front: + # Don't pickle state + for group in self._state.task_groups.values(): + group._state = None + notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups) + self._notify_front.put(notification) + return status # clear_queues() diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 9b2fd5c..c0bd110 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1439,7 +1439,10 @@ class Stream(): self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL)) if self._session_start_callback is not None: - self._session_start_callback() + if self._notify_front: + self._notify_front.put(Notification(NotificationType.START)) + else: + self._session_start_callback() status = self._scheduler.run(self.queues) @@ -1734,7 +1737,9 @@ class Stream(): return element_targets, artifact_refs def _scheduler_notification_handler(self, notification): - if notification.notification_type == NotificationType.MESSAGE: + if notification.notification_type == NotificationType.TASK_GROUPS: + self._state.task_groups = notification.task_groups + elif notification.notification_type == NotificationType.MESSAGE: self._context.messenger.message(notification.message) elif notification.notification_type == NotificationType.INTERRUPT: self._interrupt_callback() @@ -1761,6 +1766,8 @@ class Stream(): elif notification.notification_type == NotificationType.EXCEPTION: # Regenerate the exception here, so we don't have to pickle it raise SubprocessException(**notification.exception) + elif notification.notification_type == NotificationType.START: + self._session_start_callback() else: raise StreamError("Unrecognised notification type received")
