This is an automated email from the ASF dual-hosted git repository. tvb pushed a commit to branch tpollard/temp in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit cf42285881dc7e75ff5278fc9ccc71e93c97dd80 Author: Tom Pollard <[email protected]> AuthorDate: Thu Sep 26 11:50:15 2019 +0100 Add notifications for session_start & task_groups --- src/buildstream/_scheduler/scheduler.py | 15 +++++++++++++-- src/buildstream/_stream.py | 11 +++++++++-- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 8a2dbf6..07263df 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -69,6 +69,8 @@ class NotificationType(FastEnum): MESSAGE = "message" TASK_ERROR = "task_error" EXCEPTION = "exception" + START = "start" + TASK_GROUPS = "task_groups" # Notification() @@ -91,8 +93,8 @@ class Notification: element=None, message=None, task_error=None, - for_scheduler=None, - exception=None + exception=None, + task_groups=None ): self.notification_type = notification_type self.full_name = full_name @@ -103,6 +105,7 @@ class Notification: self.message = message self.task_error = task_error # Tuple of domain & reason self.exception = exception + self.task_groups = task_groups # Scheduler() @@ -228,6 +231,14 @@ class Scheduler: else: status = SchedStatus.SUCCESS + # Send the state taskgroups if we're running under the subprocess + if self._notify_front: + # Don't pickle state + for group in self._state.task_groups.values(): + group._state = None + notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups) + self._notify_front.put(notification) + return status # clear_queues() diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 1b4ba0e..7bb2f5d 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1448,7 +1448,10 @@ class Stream: self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL)) if self._session_start_callback is not None: - self._session_start_callback() + if self._notify_front: + self._notify_front.put(Notification(NotificationType.START)) + else: + self._session_start_callback() status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process()) @@ -1739,7 +1742,9 @@ class Stream: return element_targets, artifact_refs def _scheduler_notification_handler(self, notification): - if notification.notification_type == NotificationType.MESSAGE: + if notification.notification_type == NotificationType.TASK_GROUPS: + self._state.task_groups = notification.task_groups + elif notification.notification_type == NotificationType.MESSAGE: self._context.messenger.message(notification.message) elif notification.notification_type == NotificationType.INTERRUPT: self._interrupt_callback() @@ -1764,6 +1769,8 @@ class Stream: set_last_task_error(*notification.task_error) elif notification.notification_type == NotificationType.EXCEPTION: raise notification.exception.re_raise() + elif notification.notification_type == NotificationType.START: + self._session_start_callback() else: raise StreamError("Unrecognised notification type received")
